From f7f1a37c351cd737fed07c7d2c9cc1e95df0f76d Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 6 Feb 2023 20:03:36 +0800 Subject: [PATCH] fix allocate hash window function use hash-based sort bug --- src/sql/optimizer/ob_log_plan.cpp | 3 + src/sql/optimizer/ob_optimizer_util.cpp | 26 --- src/sql/optimizer/ob_optimizer_util.h | 5 - src/sql/optimizer/ob_select_log_plan.cpp | 200 +++++++++++------------ src/sql/optimizer/ob_select_log_plan.h | 28 +++- 5 files changed, 123 insertions(+), 139 deletions(-) diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index fb6a9fccd1..dd1e402f44 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -12804,6 +12804,9 @@ int ObLogPlan::create_hash_sortkey(const int64_t part_cnt, ObExecContext *exec_ctx = get_optimizer_context().get_exec_ctx(); if (OB_FAIL(expr_factory.create_raw_expr(T_FUN_SYS_HASH, hash_expr))) { LOG_WARN("failed to create raw expr", K(ret)); + } else if (OB_UNLIKELY(part_cnt > order_keys.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected order_keys count", K(ret), K(part_cnt), K(order_keys)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < part_cnt; ++i) { if (OB_FAIL(hash_expr->add_param_expr(order_keys.at(i).expr_))) { diff --git a/src/sql/optimizer/ob_optimizer_util.cpp b/src/sql/optimizer/ob_optimizer_util.cpp index 1cb0e22a76..30082533b9 100644 --- a/src/sql/optimizer/ob_optimizer_util.cpp +++ b/src/sql/optimizer/ob_optimizer_util.cpp @@ -2311,32 +2311,6 @@ int ObOptimizerUtil::is_same_table( return ret; } -int ObOptimizerUtil::get_partition_count(const ObLogicalOperator *top, - const ObIArray &partition_exprs, - const int64_t prefix_pos, - int64_t &part_cnt) -{ - int ret = OB_SUCCESS; - ObSEArray pruning_partition_exprs; - if (OB_ISNULL(top) || OB_ISNULL(top->get_plan())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(top), K(ret)); - } else if (prefix_pos > 0 || partition_exprs.empty()) { - part_cnt = 0; - } else if (OB_FAIL(simplify_ordered_exprs(top->get_fd_item_set(), - top->get_output_equal_sets(), - top->get_output_const_exprs(), - top->get_plan()->get_onetime_query_refs(), - partition_exprs, - pruning_partition_exprs))) { - LOG_WARN("failed to simplify ordered exprs", K(ret)); - } else { - // 简化后 partition_exprs 中的表达式在 sort_keys 中都有效。 - part_cnt = pruning_partition_exprs.count(); - } - return ret; -} - int ObOptimizerUtil::get_default_directions(const int64_t direction_num, ObIArray &directions) { diff --git a/src/sql/optimizer/ob_optimizer_util.h b/src/sql/optimizer/ob_optimizer_util.h index ca9452f97f..56c4661c91 100644 --- a/src/sql/optimizer/ob_optimizer_util.h +++ b/src/sql/optimizer/ob_optimizer_util.h @@ -413,11 +413,6 @@ public: ObRawExpr *child_expr, int64_t child_idx); - static int get_partition_count(const ObLogicalOperator *top, - const ObIArray &partition_exprs, - const int64_t prefix_pos, - int64_t &part_cnt); - static int get_default_directions(const int64_t direction_num, ObIArray &directions); diff --git a/src/sql/optimizer/ob_select_log_plan.cpp b/src/sql/optimizer/ob_select_log_plan.cpp index a0b2c3c9b1..a738ba9d8f 100644 --- a/src/sql/optimizer/ob_select_log_plan.cpp +++ b/src/sql/optimizer/ob_select_log_plan.cpp @@ -4431,23 +4431,26 @@ int ObSelectLogPlan::generate_window_functions_plan(const ObIArray methods; ObSEArray sort_key_ndvs; ObSEArray, 8> pby_oby_prefixes; - if (distributed) { - if (OB_FAIL(prepare_for_split_winfuncs( + bool has_non_parallel_wf = false; + if (OB_FAIL(prepare_for_split_winfuncs( local_plans.at(0).plan_tree_, adjusted_winfunc_exprs, current_sort_keys, sort_key_ndvs, pby_oby_prefixes))) { - LOG_WARN("prepare_for_split_winfuncs failed", K(ret)); - } else if (OB_FAIL(split_winfuncs_by_dist_method(local_plans.at(0).plan_tree_, + LOG_WARN("prepare_for_split_winfuncs failed", K(ret)); + } else if (distributed && + OB_FAIL(split_winfuncs_by_dist_method(local_plans.at(0).plan_tree_, adjusted_winfunc_exprs, remaining_exprs, stmt_func_idx, sort_key_ndvs, pby_oby_prefixes, split, - methods))) { - // split the window function expressions of one group (group is detected by get_next_group_window_exprs()) into multi window function operator by distribute method (HASH or RANGE) - LOG_WARN("split window function by distribute method failed", K(ret)); - } - } else { + methods, + has_non_parallel_wf))) { + // split the window function expressions of one group (group is detected by get_next_group_window_exprs()) into multi window function operator by distribute method (HASH or RANGE) + LOG_WARN("split window function by distribute method failed", K(ret)); + } else if (!distributed || has_non_parallel_wf) { + split.reuse(); + methods.reuse(); if (OB_FAIL(split.push_back(adjusted_winfunc_exprs.count())) || OB_FAIL(methods.push_back(WinDistAlgo::NONE))) { LOG_WARN("array push back failed", K(ret)); @@ -4462,6 +4465,7 @@ int ObSelectLogPlan::generate_window_functions_plan(const ObIArray pushdown_info; + ObSEArray partition_exprs; if (distributed) { ObArrayHelper> splitted_pby_oby_prefixes( end - start, @@ -4478,11 +4482,11 @@ int ObSelectLogPlan::generate_window_functions_plan(const ObIArray partition_exprs; if (NULL == top) { ret = OB_ERR_UNEXPECTED; LOG_WARN("got NULL plan tree", K(ret)); @@ -4505,11 +4508,12 @@ int ObSelectLogPlan::generate_window_functions_plan(const ObIArray 0 && OB_FAIL(ObOptimizerUtil::check_need_sort(current_sort_keys, @@ -4526,10 +4530,14 @@ int ObSelectLogPlan::generate_window_functions_plan(const ObIArray, 8> pby_oby_prefixes, + const int64_t start, + const bool not_split, + const ObIArray &partition_exprs, + const int64_t prefix_pos, + int64_t &part_cnt) +{ + int ret = OB_SUCCESS; + part_cnt = 0; + if (OB_UNLIKELY(pby_oby_prefixes.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get empty pby_oby_prefixes", K(ret), K(pby_oby_prefixes.count())); + } else if (prefix_pos > 0 || partition_exprs.empty()) { + part_cnt = 0; + } else if (not_split) { + part_cnt = pby_oby_prefixes.at(0).first; + for (int64_t i = 1; OB_SUCC(ret) && i < pby_oby_prefixes.count(); ++i) { + if (pby_oby_prefixes.at(i).first < part_cnt) { + part_cnt = pby_oby_prefixes.at(i).first; + } + } + } else if (OB_UNLIKELY(start < 0 || start >= pby_oby_prefixes.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected pby_oby_prefixes count ", K(ret), K(start), K(pby_oby_prefixes.count())); + } else { + part_cnt = pby_oby_prefixes.at(start).first; + } + return ret; +} + int ObSelectLogPlan::get_next_group_window_exprs(const ObLogicalOperator *top, ObIArray &remaining_exprs, ObIArray ¤t_sort_keys, @@ -4952,7 +4992,8 @@ int ObSelectLogPlan::split_winfuncs_by_dist_method( const ObIArray &sort_key_ndvs, const ObIArray> &pby_oby_prefixes, common::ObIArray &split, - common::ObIArray &methods) + common::ObIArray &methods, + bool &has_non_parallel_wf) { int ret = OB_SUCCESS; if (OB_ISNULL(top)) { @@ -4969,8 +5010,7 @@ int ObSelectLogPlan::split_winfuncs_by_dist_method( const int64_t WF_CARD_DOP_RADIO = 256; const int64_t dop = get_optimizer_context().get_parallel(); int64_t prev_method = -1; - bool has_non_parallel_wf = false; - for (int64_t idx = 0; OB_SUCC(ret) && idx < winfunc_exprs.count(); ) { + for (int64_t idx = 0; OB_SUCC(ret) && !has_non_parallel_wf && idx < winfunc_exprs.count(); ) { const int64_t pby_cnt = pby_oby_prefixes.at(idx).first; const int64_t pby_oby_cnt = pby_oby_prefixes.at(idx).second; const double pby_ndv = pby_cnt == 0 ? 0 : sort_key_ndvs.at(pby_cnt - 1); @@ -5080,12 +5120,6 @@ int ObSelectLogPlan::split_winfuncs_by_dist_method( } } } - if (OB_SUCC(ret) && has_non_parallel_wf) { - split.reuse(); - methods.reuse(); - OZ(split.push_back(winfunc_exprs.count())); - OZ(methods.push_back(WinDistAlgo::NONE)); - } } return ret; } @@ -5103,55 +5137,27 @@ int ObSelectLogPlan::split_winfuncs_by_dist_method( int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, const ObIArray &winfunc_exprs, const ObIArray &sort_keys, + const ObIArray &partition_exprs, WinDistAlgo dist_method, const bool is_pushdown, ObOpPseudoColumnRawExpr *wf_aggr_status_expr, - const ObIArray &pushdown_info) + const ObIArray &pushdown_info, + bool need_sort, + int64_t prefix_pos, + int64_t part_cnt) { int ret = OB_SUCCESS; - bool need_sort = false; - int64_t prefix_pos = 0; ObExchangeInfo exch_info; bool single_part_parallel = false; bool is_partition_wise = false; - ObSEArray partition_exprs; double pby_ndv = 1.0; ObSEArray range_dist_keys; int64_t range_dist_pby_prefix = 0; bool range_distribution = false; - int64_t part_cnt = 0; LOG_DEBUG("try create merge distribute function plan", K(dist_method), K(sort_keys), K(winfunc_exprs)); if (OB_ISNULL(top) || OB_UNLIKELY(winfunc_exprs.empty())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpecated error", K(top), K(winfunc_exprs.count()), K(ret)); - } else if (OB_FAIL(ObOptimizerUtil::check_need_sort(sort_keys, - top->get_op_ordering(), - top->get_fd_item_set(), - top->get_output_equal_sets(), - top->get_output_const_exprs(), - get_onetime_query_refs(), - top->get_is_at_most_one_row(), - need_sort, - prefix_pos))) { - LOG_WARN("failed to check need sort", K(ret)); - } else if (OB_FAIL(get_window_function_partition_exprs(winfunc_exprs, - partition_exprs))) { - LOG_WARN("failed to get partition exprs", K(ret)); - } else if (OB_FAIL(ObOptimizerUtil::get_partition_count( - top, partition_exprs, prefix_pos, part_cnt))) { - LOG_WARN("failed to get partition count", K(ret)); - } else if (need_sort && part_cnt > 0 && - OB_FAIL(ObOptimizerUtil::check_need_sort(sort_keys, - top->get_op_ordering(), - top->get_fd_item_set(), - top->get_output_equal_sets(), - top->get_output_const_exprs(), - get_onetime_query_refs(), - top->get_is_at_most_one_row(), - need_sort, - prefix_pos, - part_cnt))) { - LOG_WARN("failed to check need sort", K(ret)); } else { // range distribution if (WinDistAlgo::RANGE == dist_method @@ -5286,16 +5292,11 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, LOG_WARN("failed to allocate_window_function_as_top", K(ret)); } /*else if (FALSE_IT(wf_aggr_status_expr = static_cast(top)->get_aggr_status_expr())) { - } */else if (OB_FAIL(get_pushdown_window_function_partition_exprs( - winfunc_exprs, partition_exprs))) { - LOG_WARN("failed to get_window_function_partition_exprs_for_push_down", K(ret)); - } else if (OB_FAIL(get_grouping_style_exchange_info(partition_exprs, - top->get_output_equal_sets(), - exch_info))) { - LOG_WARN("failed to get grouping style exchange info", K(ret)); - } else if (FALSE_IT(exch_info.is_wf_hybrid_ = true)) { - } else if (FALSE_IT(exch_info.wf_hybrid_aggr_status_expr_ = wf_aggr_status_expr)) { - // use the value of wf_aggr_status_expr to decide how to distribute in ex op + } */else if (OB_FAIL(get_pushdown_window_function_exchange_info(winfunc_exprs, + top->get_output_equal_sets(), + wf_aggr_status_expr, + exch_info))) { + LOG_WARN("failed to get pushdown window function exchange info", K(ret)); } else if (OB_FAIL(tmp_sort_keys.assign(sort_keys))) { LOG_WARN("failed to assign sort_keys to tmp_sort_keys", K(ret)); } else if (OB_FAIL(tmp_sort_keys.push_back( @@ -5362,9 +5363,9 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, * 这里窗口函数会按照分组表达式数量进行排序(只统计非常量的分组表达式数量,数量多的排在前)。 */ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, - ObIArray &adjusted_winfunc_exprs, + const ObIArray &adjusted_winfunc_exprs, const ObIArray &sort_keys, - ObIArray &partition_exprs, + const ObIArray &partition_exprs, const int64_t part_cnt, const bool is_pushdown, ObOpPseudoColumnRawExpr *wf_aggr_status_expr, @@ -5373,23 +5374,20 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, int ret = OB_SUCCESS; ObExchangeInfo exch_info; OrderItem hash_sortkey; - bool match_parallel = false; bool is_partition_wise = false; const ObArray range_dist_keys; const int64_t range_dist_pby_prefix = 0; - if (OB_ISNULL(top)) { + LOG_DEBUG("create hash window function plan", K(part_cnt), K(sort_keys), K(adjusted_winfunc_exprs)); + if (OB_ISNULL(top) || OB_UNLIKELY(partition_exprs.empty())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(top), K(ret)); - } else if (top->is_distributed() && !partition_exprs.empty() && + LOG_WARN("get unexpected error", K(ret), K(top), K(partition_exprs.empty())); + } else if (top->is_distributed() && OB_FAIL(top->check_sharding_compatible_with_reduce_expr(partition_exprs, is_partition_wise))) { LOG_WARN("failed to check if sharding compatible", K(ret)); - } else if (top->is_distributed() && !is_partition_wise && partition_exprs.empty() && - OB_FAIL(match_window_function_parallel(adjusted_winfunc_exprs, match_parallel))) { - LOG_WARN("failed to check if match window function parallel", K(ret)); } else if (part_cnt > 0 && OB_FAIL(create_hash_sortkey(part_cnt, sort_keys, hash_sortkey))) { LOG_WARN("failed to create hash sort key", K(ret), K(part_cnt), K(sort_keys)); - } else if (!top->is_distributed() || is_partition_wise || match_parallel) { + } else if (!top->is_distributed() || is_partition_wise) { exch_info.dist_method_ = ObPQDistributeMethod::NONE; if (OB_FAIL(allocate_sort_and_exchange_as_top(top, exch_info, @@ -5402,7 +5400,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, part_cnt > 0 ? &hash_sortkey : NULL))) { LOG_WARN("failed to allocate sort as top", K(ret)); } else if (OB_FAIL(allocate_window_function_as_top(adjusted_winfunc_exprs, - match_parallel, + false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, range_dist_keys, @@ -5428,7 +5426,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, part_cnt > 0 ? &hash_sortkey : NULL))) { LOG_WARN("failed to allocate sort as top", K(ret)); } else if (OB_FAIL(allocate_window_function_as_top(adjusted_winfunc_exprs, - match_parallel, + false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, range_dist_keys, @@ -5463,7 +5461,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, } else { const OrderItem &hash_sortkey = static_cast(top)->get_hash_sortkey(); if (OB_FAIL(allocate_window_function_as_top(adjusted_winfunc_exprs, - match_parallel, + false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::PARTICIPATOR, range_dist_keys, @@ -5472,16 +5470,11 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, wf_aggr_status_expr, pushdown_info))) { LOG_WARN("failed to allocate_window_function_as_top", K(ret)); - } else if (OB_FAIL(get_pushdown_window_function_partition_exprs( - adjusted_winfunc_exprs, partition_exprs))) { - LOG_WARN("failed to get_window_function_partition_exprs_for_push_down", K(ret)); - } else if (OB_FAIL(get_grouping_style_exchange_info(partition_exprs, - top->get_output_equal_sets(), - exch_info))) { - LOG_WARN("failed to get grouping style exchange info", K(ret)); - } else if (FALSE_IT(exch_info.is_wf_hybrid_ = true)) { - } else if (FALSE_IT(exch_info.wf_hybrid_aggr_status_expr_ = wf_aggr_status_expr)) { - // use the value of wf_aggr_status_expr to decide how to distribute in ex op + } else if (OB_FAIL(get_pushdown_window_function_exchange_info(adjusted_winfunc_exprs, + top->get_output_equal_sets(), + wf_aggr_status_expr, + exch_info))) { + LOG_WARN("failed to get pushdown window function exchange info", K(ret)); } else if (OB_FAIL(tmp_sort_keys.push_back(hash_sortkey))) { LOG_WARN("failed to push_back extra sort key hash_sortkey of aggr status to tmp_sort_keys", K(ret)); @@ -5506,7 +5499,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, top->get_is_local_order()))) { LOG_WARN("failed to allocate sort as top", K(ret)); } else if (OB_FAIL(allocate_window_function_as_top(adjusted_winfunc_exprs, - match_parallel, + false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::CONSOLIDATOR, range_dist_keys, @@ -5661,21 +5654,28 @@ int ObSelectLogPlan::check_wf_pushdown_supported(ObWinFunRawExpr *win_expr, return ret; } -// get the pby expr of the first win_expr, the pby col count of first pby expr is the most -int ObSelectLogPlan::get_pushdown_window_function_partition_exprs( +int ObSelectLogPlan::get_pushdown_window_function_exchange_info( const ObIArray &win_exprs, - ObIArray &partition_exprs) + const EqualSets & equal_sets, + ObOpPseudoColumnRawExpr *wf_aggr_status_expr, + ObExchangeInfo &exch_info) { int ret = OB_SUCCESS; - partition_exprs.reuse(); if (0 == win_exprs.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("count of win_exprs is zero", K(ret)); } else if (OB_ISNULL(win_exprs.at(0))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(partition_exprs.assign(win_exprs.at(0)->get_partition_exprs()))) { - LOG_WARN("fail to assign", K(ret)); + // get the pby expr of the first win_expr, the pby col count of first pby expr is the most + } else if (OB_FAIL(get_grouping_style_exchange_info(win_exprs.at(0)->get_partition_exprs(), + equal_sets, + exch_info))) { + LOG_WARN("failed to get grouping style exchange info", K(ret)); + } else { + exch_info.is_wf_hybrid_ = true; + // use the value of wf_aggr_status_expr to decide how to distribute in ex op + exch_info.wf_hybrid_aggr_status_expr_ = wf_aggr_status_expr; } return ret; } diff --git a/src/sql/optimizer/ob_select_log_plan.h b/src/sql/optimizer/ob_select_log_plan.h index 1cb4f942e8..6917d49f16 100644 --- a/src/sql/optimizer/ob_select_log_plan.h +++ b/src/sql/optimizer/ob_select_log_plan.h @@ -503,7 +503,8 @@ private: const ObIArray &sort_key_ndvs, const ObIArray> &pby_oby_prefixes, common::ObIArray &split, - common::ObIArray &methods); + common::ObIArray &methods, + bool &has_non_parallel_wf); // Generate sort keys for window function // @param[out] order_item generated sort keys @@ -528,6 +529,13 @@ private: int64_t &pby_prefix, int64_t &pby_oby_prefix); + int get_partition_count(const ObSEArray, 8> pby_oby_prefixes, + const int64_t start, + const bool not_split, + const ObIArray &partition_exprs, + const int64_t prefix_pos, + int64_t &part_cnt); + /** * @brief set_default_sort_directions * 确定窗口函数partition表达式的排序方向 @@ -544,15 +552,19 @@ private: int create_merge_window_function_plan(ObLogicalOperator *&top, const ObIArray &winfunc_exprs, const ObIArray &sort_keys, + const ObIArray &partition_exprs, WinDistAlgo dist_method, const bool is_pushdown, ObOpPseudoColumnRawExpr *wf_aggr_status_expr, - const ObIArray &pushdown_info); + const ObIArray &pushdown_info, + bool need_sort, + int64_t prefix_pos, + int64_t part_cnt); int create_hash_window_function_plan(ObLogicalOperator *&top, - ObIArray &adjusted_winfunc_exprs, + const ObIArray &adjusted_winfunc_exprs, const ObIArray &sort_keys, - ObIArray &partition_exprs, + const ObIArray &partition_exprs, const int64_t part_cnt, const bool is_pushdown, ObOpPseudoColumnRawExpr *wf_aggr_status_expr, @@ -570,10 +582,10 @@ private: int check_wf_pushdown_supported(ObWinFunRawExpr *win_expr, bool &supported); - // partition_exprs for push_down window_function op is the exprs of the smallest partition - int get_pushdown_window_function_partition_exprs( - const ObIArray &win_exprs, - ObIArray &partition_exprs); + int get_pushdown_window_function_exchange_info(const ObIArray &win_exprs, + const EqualSets & equal_sets, + ObOpPseudoColumnRawExpr *wf_aggr_status_expr, + ObExchangeInfo &exch_info); int get_window_function_partition_exprs(const ObIArray &win_exprs, ObIArray &partition_exprs);