[CP] Fix startup filter post bug

This commit is contained in:
obdev
2023-02-24 12:03:57 +00:00
committed by ob-robot
parent 4d06b2886d
commit e615feb42d
9 changed files with 142 additions and 12 deletions

View File

@ -1116,13 +1116,13 @@ int ObLogJoin::allocate_startup_expr_post()
CONNECT_BY_JOIN == join_type_ ||
LEFT_SEMI_JOIN == join_type_ ||
LEFT_ANTI_JOIN == join_type_) {
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) {
if (OB_FAIL(allocate_startup_expr_post(first_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
} else if (RIGHT_OUTER_JOIN == join_type_ ||
RIGHT_SEMI_JOIN == join_type_ ||
RIGHT_ANTI_JOIN == join_type_) {
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(second_child))) {
if (OB_FAIL(allocate_startup_expr_post(second_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
} else if (FULL_OUTER_JOIN == join_type_) {
@ -1253,3 +1253,48 @@ bool ObLogJoin::is_my_exec_expr(const ObRawExpr *expr)
{
return ObOptimizerUtil::find_item(nl_params_, expr);
}
int ObLogJoin::allocate_startup_expr_post(int64_t child_idx)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child", K(ret));
} else if (child->get_startup_exprs().empty()) {
//do nothing
} else {
ObSEArray<ObRawExpr*, 4> non_startup_exprs, new_startup_exprs;
ObIArray<ObRawExpr*> &startup_exprs = child->get_startup_exprs();
for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) {
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
}
} else if (startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) {
bool found = false;
if (is_nlj_with_param_down()
&& OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), get_nl_params(), found))) {
LOG_WARN("fail to check if contain my exec param");
} else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
} else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
}
} else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) {
LOG_WARN("failed to assign exprs", K(ret));
}
}
}
return ret;
}

View File

@ -183,6 +183,7 @@ namespace sql
ObPQDistributeMethod::Type &right_dist_method);
bool is_using_slave_mapping() { return SM_NONE != slave_mapping_type_; }
int allocate_startup_expr_post() override;
int allocate_startup_expr_post(int64_t child_idx) override;
// print outline
virtual int print_outline_data(PlanText &plan_text) override;

View File

@ -552,7 +552,7 @@ int ObLogSubPlanFilter::check_and_set_use_batch()
int ObLogSubPlanFilter::allocate_startup_expr_post()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) {
if (OB_FAIL(allocate_startup_expr_post(first_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
return ret;
@ -720,3 +720,54 @@ int ObLogSubPlanFilter::compute_equal_set()
}
return ret;
}
int ObLogSubPlanFilter::allocate_startup_expr_post(int64_t child_idx)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child", K(ret));
} else if (child->get_startup_exprs().empty()) {
//do nothing
} else {
ObSEArray<ObRawExpr*, 4> non_startup_exprs, new_startup_exprs;
ObIArray<ObRawExpr*> &startup_exprs = child->get_startup_exprs();
ObSEArray<ObExecParamRawExpr*, 4> my_exec_params;
if (OB_FAIL(my_exec_params.assign(onetime_exprs_))) {
LOG_WARN("fail to push back onetime exprs", K(ret));
} else if (OB_FAIL(append(my_exec_params, exec_params_))) {
LOG_WARN("fail to push back exec param exprs", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) {
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
}
} else if (startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) {
bool found = false;
if (!my_exec_params.empty()
&& OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), my_exec_params, found))) {
LOG_WARN("fail to check if contain onetime exec param", K(ret));
} else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
} else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr",K(ret));
}
} else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) {
LOG_WARN("failed to assign exprs", K(ret));
}
}
}
return ret;
}

View File

@ -109,6 +109,8 @@ public:
int allocate_startup_expr_post() override;
int allocate_startup_expr_post(int64_t child_idx) override;
int allocate_subquery_id();
int replace_nested_subquery_exprs(

View File

@ -3903,8 +3903,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx)
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM) ||
startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) {
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
@ -3913,7 +3912,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx)
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(add_startup_exprs(new_startup_exprs))) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else {
//exchange out上面的startup filter保留,用于控制当前dfo提前终止

View File

@ -1360,7 +1360,7 @@ public:
virtual int has_block_parent_for_shj(bool &has_shj);
virtual int allocate_startup_expr_post();
int allocate_startup_expr_post(int64_t child_idx);
virtual int allocate_startup_expr_post(int64_t child_idx);
/**
* Start plan tree traverse
*

View File

@ -7971,6 +7971,36 @@ int ObOptimizerUtil::expr_calculable_by_exprs(const ObRawExpr *src_expr,
return ret;
}
int ObOptimizerUtil::check_contain_my_exec_param(ObRawExpr* expr, const common::ObIArray<ObExecParamRawExpr*> & my_exec_params, bool &contain)
{
int ret = OB_SUCCESS;
bool is_stack_overflow = false;
contain = false;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (OB_FAIL(check_stack_overflow(is_stack_overflow))) {
LOG_WARN("check stack overflow failed", K(ret));
} else if (is_stack_overflow) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too deep recursive", K(ret));
} else if (!expr->has_flag(CNT_DYNAMIC_PARAM)) {
//do nothing
} else if (expr->is_exec_param_expr()) {
const ObExecParamRawExpr *exec_expr = static_cast<const ObExecParamRawExpr*>(expr);
contain = find_exec_param(my_exec_params, exec_expr);
} else if (expr->is_set_op_expr() || expr->is_query_ref_expr() || expr->is_column_ref_expr()) {
//do nothing
} else {
for (int64_t i = 0; !contain && OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(SMART_CALL(check_contain_my_exec_param(expr->get_param_expr(i), my_exec_params, contain)))) {
LOG_WARN("failed to check contain batch stmt parameter", K(ret));
}
}
}
return ret;
}
/* get the smallest set from which all exprs can be evaluated */
int ObOptimizerUtil::get_minset_of_exprs(const ObIArray<ObRawExpr *> &src_exprs, ObIArray<ObRawExpr *> &min_set) {
int ret = OB_SUCCESS;
@ -8539,4 +8569,4 @@ int ObOptimizerUtil::replace_gen_column(ObLogPlan *log_plan, ObRawExpr *part_exp
}
}
return ret;
}
}

View File

@ -1433,6 +1433,8 @@ public:
ObOptimizerContext &opt_ctx,
ObRawExpr *&calc_part_id_expr);
static int check_contain_my_exec_param(ObRawExpr* expr, const common::ObIArray<ObExecParamRawExpr*> & my_exec_params, bool &contain);
private:
//disallow construct
ObOptimizerUtil();

View File

@ -866,11 +866,11 @@ Outputs & filters:
sort_keys([t1.c1, ASC])
1 - output([t1.c1]), filter(nil), rowset=256
exec_params_(nil), onetime_exprs_([(T_OP_EXISTS, subquery(1))]), init_plan_idxs_(nil), batch_das=false
2 - output([t1.c1]), filter(nil), rowset=256
2 - output([t1.c1]), filter(nil), startup_filter([:0]), rowset=256
3 - output([t1.c1]), filter(nil), rowset=256
dop=2
4 - output([t1.c1]), filter(nil), rowset=256
5 - output([t1.c1]), filter(nil), startup_filter([:0]), rowset=256
5 - output([t1.c1]), filter(nil), rowset=256
access([t1.c1]), partitions(p[0-4])
is_index_back=false, is_global_index=false,
range_key([t1.__pk_increment]), range(MIN ; MAX)always true
@ -920,11 +920,11 @@ Outputs & filters:
sort_keys([t1.c1, ASC])
1 - output([t1.c1]), filter([(T_OP_EXISTS, subquery(2))]), rowset=256
exec_params_([t1.c1]), onetime_exprs_([(T_OP_EXISTS, subquery(1))]), init_plan_idxs_(nil), batch_das=false
2 - output([t1.c1]), filter(nil), rowset=256
2 - output([t1.c1]), filter(nil), startup_filter([:1]), rowset=256
3 - output([t1.c1]), filter(nil), rowset=256
dop=2
4 - output([t1.c1]), filter(nil), rowset=256
5 - output([t1.c1]), filter(nil), startup_filter([:1]), rowset=256
5 - output([t1.c1]), filter(nil), rowset=256
access([t1.c1]), partitions(p[0-4])
is_index_back=false, is_global_index=false,
range_key([t1.__pk_increment]), range(MIN ; MAX)always true