fix window function op ordering bug
This commit is contained in:
@ -5628,9 +5628,14 @@ int ObStaticEngineCG::generate_spec(ObLogWindowFunction &op, ObWindowFunctionSpe
|
||||
LOG_ERROR("wrong number of children", K(ret), K(op.get_num_of_child()));
|
||||
}
|
||||
if (OB_SUCC(ret) && op.is_range_dist_parallel()) {
|
||||
OZ(fill_sort_info(op.get_rd_sort_keys(), spec.rd_sort_collations_, rd_expr));
|
||||
OZ(fill_sort_funcs(spec.rd_sort_collations_, spec.rd_sort_cmp_funcs_, rd_expr));
|
||||
OZ(append(all_expr, rd_expr));
|
||||
ObSEArray<OrderItem, 8> rd_sort_keys;
|
||||
if (OB_FAIL(op.get_rd_sort_keys(rd_sort_keys))) {
|
||||
LOG_WARN("Get unexpected null", K(ret));
|
||||
} else {
|
||||
OZ(fill_sort_info(rd_sort_keys, spec.rd_sort_collations_, rd_expr));
|
||||
OZ(fill_sort_funcs(spec.rd_sort_collations_, spec.rd_sort_cmp_funcs_, rd_expr));
|
||||
OZ(append(all_expr, rd_expr));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
#include "ob_opt_est_cost.h"
|
||||
#include "sql/optimizer/ob_join_order.h"
|
||||
#include "common/ob_smart_call.h"
|
||||
#include "sql/optimizer/ob_log_exchange.h"
|
||||
|
||||
#define PRINT_BOUND(bound_name, bound) \
|
||||
if (OB_SUCC(ret)) { \
|
||||
@ -77,9 +78,14 @@ using namespace oceanbase::sql::log_op_def;
|
||||
int ObLogWindowFunction::get_op_exprs(ObIArray<ObRawExpr*> &all_exprs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
FOREACH_CNT_X(it, rd_sort_keys_, OB_SUCC(ret)) {
|
||||
if (OB_FAIL(all_exprs.push_back(it->expr_))) {
|
||||
LOG_WARN("array push back failed", K(ret));
|
||||
if (OB_UNLIKELY(sort_keys_.count() < rd_sort_keys_cnt_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected params", K(ret), K(sort_keys_.count()), K(rd_sort_keys_cnt_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < rd_sort_keys_cnt_; i++) {
|
||||
if (OB_FAIL(all_exprs.push_back(sort_keys_.at(i).expr_))) {
|
||||
LOG_WARN("array push back failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -336,11 +342,18 @@ int ObLogWindowFunction::get_win_partition_intersect_exprs(ObIArray<ObWinFunRawE
|
||||
int ObLogWindowFunction::compute_op_ordering()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLogicalOperator *child = NULL;
|
||||
if (OB_FAIL(ObLogicalOperator::compute_op_ordering())) {
|
||||
LOG_WARN("failed to compute op ordering", K(ret));
|
||||
} else if (OB_ISNULL(child = get_child(ObLogicalOperator::first_child))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("first child is null", K(ret), K(child));
|
||||
} else if (!single_part_parallel_) {
|
||||
is_local_order_ = (range_dist_parallel_ || is_fully_paratition_wise())
|
||||
&& !get_op_ordering().empty();
|
||||
is_local_order_ = (range_dist_parallel_ || is_fully_paratition_wise()
|
||||
|| (get_sort_keys().empty()
|
||||
&& LOG_EXCHANGE == child->get_type()
|
||||
&& child->get_is_local_order())
|
||||
) && !get_op_ordering().empty();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -430,7 +443,7 @@ int ObLogWindowFunction::inner_replace_op_exprs(
|
||||
const common::ObIArray<std::pair<ObRawExpr *, ObRawExpr*>> &to_replace_exprs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
FOREACH_X(key, rd_sort_keys_, OB_SUCC(ret)) {
|
||||
FOREACH_X(key, sort_keys_, OB_SUCC(ret)) {
|
||||
if (OB_FAIL(replace_expr_action(to_replace_exprs, key->expr_))) {
|
||||
LOG_WARN("replace expr failed", K(ret));
|
||||
}
|
||||
@ -454,3 +467,20 @@ int ObLogWindowFunction::inner_replace_op_exprs(
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogWindowFunction::get_rd_sort_keys(common::ObIArray<OrderItem> &rd_sort_keys)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
rd_sort_keys.reuse();
|
||||
if (OB_UNLIKELY(sort_keys_.count() < rd_sort_keys_cnt_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected params", K(ret), K(sort_keys_.count()), K(rd_sort_keys_cnt_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < rd_sort_keys_cnt_; i++) {
|
||||
if (OB_FAIL(rd_sort_keys.push_back(sort_keys_.at(i)))) {
|
||||
LOG_WARN("array push back failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -32,6 +32,7 @@ namespace sql
|
||||
single_part_parallel_(false),
|
||||
range_dist_parallel_(false),
|
||||
role_type_(WindowFunctionRoleType::NORMAL),
|
||||
rd_sort_keys_cnt_(0),
|
||||
rd_pby_sort_cnt_(0),
|
||||
wf_aggr_status_expr_(NULL)
|
||||
{}
|
||||
@ -69,14 +70,16 @@ namespace sql
|
||||
bool is_push_down() { return PARTICIPATOR == role_type_|| CONSOLIDATOR == role_type_; }
|
||||
bool is_participator() { return PARTICIPATOR == role_type_; }
|
||||
bool is_consolidator() { return CONSOLIDATOR == role_type_; }
|
||||
int set_rd_sort_keys(const common::ObIArray<OrderItem> &sort_keys)
|
||||
int get_rd_sort_keys(common::ObIArray<OrderItem> &rd_sort_keys);
|
||||
int set_sort_keys(const common::ObIArray<OrderItem> &sort_keys)
|
||||
{
|
||||
return rd_sort_keys_.assign(sort_keys);
|
||||
return sort_keys_.assign(sort_keys);
|
||||
}
|
||||
const common::ObIArray<OrderItem> &get_rd_sort_keys() const
|
||||
const common::ObIArray<OrderItem> &get_sort_keys() const
|
||||
{
|
||||
return rd_sort_keys_;
|
||||
return sort_keys_;
|
||||
}
|
||||
void set_rd_sort_keys_cnt(const int64_t cnt) { rd_sort_keys_cnt_ = cnt; }
|
||||
int set_pushdown_info(const common::ObIArray<bool> &pushdown_info)
|
||||
{
|
||||
return pushdown_info_.assign(pushdown_info);
|
||||
@ -121,8 +124,10 @@ namespace sql
|
||||
// https://yuque.antfin.com/ob/sql/wf_adaptive_parallel_execution#69270e6f
|
||||
WindowFunctionRoleType role_type_;
|
||||
|
||||
// sort keys for range distributed parallel.
|
||||
common::ObSEArray<OrderItem, 8, common::ModulePageAllocator, true> rd_sort_keys_;
|
||||
// sort keys needed for window function
|
||||
common::ObSEArray<OrderItem, 8, common::ModulePageAllocator, true> sort_keys_;
|
||||
// sort keys count for range distributed parallel.
|
||||
int64_t rd_sort_keys_cnt_;
|
||||
// the first %rd_pby_sort_cnt_ of %rd_sort_keys_ is the partition by of window function.
|
||||
int64_t rd_pby_sort_cnt_;
|
||||
|
||||
|
||||
@ -1226,7 +1226,8 @@ int ObSelectLogPlan::allocate_window_function_as_top(const ObIArray<ObWinFunRawE
|
||||
const bool match_parallel,
|
||||
const bool is_partition_wise,
|
||||
const int32_t role_type,
|
||||
const ObIArray<OrderItem> &range_dist_keys,
|
||||
const ObIArray<OrderItem> &sort_keys,
|
||||
const int64_t range_dist_keys_cnt,
|
||||
const int64_t range_dist_pby_prefix,
|
||||
ObLogicalOperator *&top,
|
||||
ObOpPseudoColumnRawExpr *wf_aggr_status_expr,
|
||||
@ -1243,16 +1244,15 @@ int ObSelectLogPlan::allocate_window_function_as_top(const ObIArray<ObWinFunRawE
|
||||
LOG_ERROR("allocate memory for ObLogWindowFunction failed", K(ret));
|
||||
} else if (OB_FAIL(append(window_function->get_window_exprs(), win_exprs))) {
|
||||
LOG_WARN("failed to add window expr", K(ret));
|
||||
|
||||
} else if (!range_dist_keys.empty()
|
||||
&& OB_FAIL(window_function->set_rd_sort_keys(range_dist_keys))) {
|
||||
} else if (OB_FAIL(window_function->set_sort_keys(sort_keys))) {
|
||||
LOG_WARN("set range distribution sort keys failed", K(ret));
|
||||
} else {
|
||||
window_function->set_rd_sort_keys_cnt(range_dist_keys_cnt);
|
||||
window_function->set_single_part_parallel(match_parallel);
|
||||
window_function->set_is_partition_wise(is_partition_wise);
|
||||
window_function->set_child(ObLogicalOperator::first_child, top);
|
||||
window_function->set_role_type(ObLogWindowFunction::WindowFunctionRoleType(role_type));
|
||||
if (!range_dist_keys.empty()) {
|
||||
if (range_dist_keys_cnt > 0) {
|
||||
window_function->set_ragne_dist_parallel(true);
|
||||
window_function->set_rd_pby_sort_cnt(range_dist_pby_prefix);
|
||||
}
|
||||
@ -5158,7 +5158,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
bool single_part_parallel = false;
|
||||
bool is_partition_wise = false;
|
||||
double pby_ndv = 1.0;
|
||||
ObSEArray<OrderItem, 8> range_dist_keys;
|
||||
int64_t range_dist_keys_cnt = 0;
|
||||
int64_t range_dist_pby_prefix = 0;
|
||||
bool range_distribution = false;
|
||||
LOG_DEBUG("try create merge distribute function plan", K(dist_method), K(sort_keys), K(winfunc_exprs));
|
||||
@ -5170,6 +5170,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
if (WinDistAlgo::RANGE == dist_method
|
||||
|| WinDistAlgo::LIST == dist_method) {
|
||||
int64_t pby_oby_prefix = 0;
|
||||
ObSEArray<OrderItem, 8> range_dist_keys;
|
||||
// al range distribute window function has the same pby, pby+oby sort prefix
|
||||
OZ(get_winfunc_pby_oby_sort_prefix(top,
|
||||
winfunc_exprs.at(0),
|
||||
@ -5179,6 +5180,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < pby_oby_prefix; i++) {
|
||||
OZ(range_dist_keys.push_back(sort_keys.at(i)));
|
||||
}
|
||||
range_dist_keys_cnt = pby_oby_prefix;
|
||||
need_sort = false;
|
||||
prefix_pos = 0;
|
||||
OZ(ObOptimizerUtil::check_need_sort(range_dist_keys,
|
||||
@ -5228,7 +5230,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5242,7 +5245,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5261,7 +5265,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5291,7 +5296,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::PARTICIPATOR,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5320,7 +5326,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::CONSOLIDATOR,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5349,7 +5356,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top,
|
||||
single_part_parallel,
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5382,7 +5390,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top,
|
||||
ObExchangeInfo exch_info;
|
||||
OrderItem hash_sortkey;
|
||||
bool is_partition_wise = false;
|
||||
const ObArray<OrderItem> range_dist_keys;
|
||||
const int64_t range_dist_keys_cnt = 0;
|
||||
const int64_t range_dist_pby_prefix = 0;
|
||||
LOG_DEBUG("create hash window function plan", K(part_cnt), K(sort_keys), K(adjusted_winfunc_exprs));
|
||||
if (OB_ISNULL(top) || OB_UNLIKELY(partition_exprs.empty())) {
|
||||
@ -5410,7 +5418,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top,
|
||||
false, /* match_parallel */
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5436,7 +5445,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top,
|
||||
false, /* match_parallel */
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::NORMAL,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5471,7 +5481,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top,
|
||||
false, /* match_parallel */
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::PARTICIPATOR,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
@ -5509,7 +5520,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top,
|
||||
false, /* match_parallel */
|
||||
is_partition_wise,
|
||||
ObLogWindowFunction::WindowFunctionRoleType::CONSOLIDATOR,
|
||||
range_dist_keys,
|
||||
sort_keys,
|
||||
range_dist_keys_cnt,
|
||||
range_dist_pby_prefix,
|
||||
top,
|
||||
wf_aggr_status_expr,
|
||||
|
||||
@ -596,7 +596,8 @@ private:
|
||||
const bool match_parallel,
|
||||
const bool is_partition_wise,
|
||||
const int32_t role_type,
|
||||
const ObIArray<OrderItem> &range_dist_keys,
|
||||
const ObIArray<OrderItem> &sort_keys,
|
||||
const int64_t range_dist_keys_cnt,
|
||||
const int64_t range_dist_pby_prefix,
|
||||
ObLogicalOperator *&top,
|
||||
ObOpPseudoColumnRawExpr *wf_aggr_status_expr,
|
||||
|
||||
Reference in New Issue
Block a user