diff --git a/src/sql/optimizer/ob_log_join.cpp b/src/sql/optimizer/ob_log_join.cpp index 6ea0e59b7a..fee433d509 100644 --- a/src/sql/optimizer/ob_log_join.cpp +++ b/src/sql/optimizer/ob_log_join.cpp @@ -1116,13 +1116,13 @@ int ObLogJoin::allocate_startup_expr_post() CONNECT_BY_JOIN == join_type_ || LEFT_SEMI_JOIN == join_type_ || LEFT_ANTI_JOIN == join_type_) { - if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) { + if (OB_FAIL(allocate_startup_expr_post(first_child))) { LOG_WARN("failed to allocate startup expr post", K(ret)); } } else if (RIGHT_OUTER_JOIN == join_type_ || RIGHT_SEMI_JOIN == join_type_ || RIGHT_ANTI_JOIN == join_type_) { - if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(second_child))) { + if (OB_FAIL(allocate_startup_expr_post(second_child))) { LOG_WARN("failed to allocate startup expr post", K(ret)); } } else if (FULL_OUTER_JOIN == join_type_) { @@ -1253,3 +1253,48 @@ bool ObLogJoin::is_my_exec_expr(const ObRawExpr *expr) { return ObOptimizerUtil::find_item(nl_params_, expr); } + +int ObLogJoin::allocate_startup_expr_post(int64_t child_idx) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = get_child(child_idx); + if (OB_ISNULL(child)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null child", K(ret)); + } else if (child->get_startup_exprs().empty()) { + //do nothing + } else { + ObSEArray non_startup_exprs, new_startup_exprs; + ObIArray &startup_exprs = child->get_startup_exprs(); + for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) { + if (OB_ISNULL(startup_exprs.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) { + if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } + } else if (startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) { + bool found = false; + if (is_nlj_with_param_down() + && OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), get_nl_params(), found))) { + LOG_WARN("fail to check if contain my exec param"); + } else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } + } else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("failed to push back expr", K(ret)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) { + LOG_WARN("failed to add startup exprs", K(ret)); + } else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) { + LOG_WARN("failed to assign exprs", K(ret)); + } + } + } + return ret; +} diff --git a/src/sql/optimizer/ob_log_join.h b/src/sql/optimizer/ob_log_join.h index 9c82171b2c..03cc098e72 100644 --- a/src/sql/optimizer/ob_log_join.h +++ b/src/sql/optimizer/ob_log_join.h @@ -183,6 +183,7 @@ namespace sql ObPQDistributeMethod::Type &right_dist_method); bool is_using_slave_mapping() { return SM_NONE != slave_mapping_type_; } int allocate_startup_expr_post() override; + int allocate_startup_expr_post(int64_t child_idx) override; // print outline virtual int print_outline_data(PlanText &plan_text) override; diff --git a/src/sql/optimizer/ob_log_subplan_filter.cpp b/src/sql/optimizer/ob_log_subplan_filter.cpp index 06d70d0870..081ecb27cd 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.cpp +++ b/src/sql/optimizer/ob_log_subplan_filter.cpp @@ -552,7 +552,7 @@ int ObLogSubPlanFilter::check_and_set_use_batch() int ObLogSubPlanFilter::allocate_startup_expr_post() { int ret = OB_SUCCESS; - if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) { + if (OB_FAIL(allocate_startup_expr_post(first_child))) { LOG_WARN("failed to allocate startup expr post", K(ret)); } return ret; @@ -720,3 +720,54 @@ int ObLogSubPlanFilter::compute_equal_set() } return ret; } + +int ObLogSubPlanFilter::allocate_startup_expr_post(int64_t child_idx) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = get_child(child_idx); + if (OB_ISNULL(child)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null child", K(ret)); + } else if (child->get_startup_exprs().empty()) { + //do nothing + } else { + ObSEArray non_startup_exprs, new_startup_exprs; + ObIArray &startup_exprs = child->get_startup_exprs(); + ObSEArray my_exec_params; + if (OB_FAIL(my_exec_params.assign(onetime_exprs_))) { + LOG_WARN("fail to push back onetime exprs", K(ret)); + } else if (OB_FAIL(append(my_exec_params, exec_params_))) { + LOG_WARN("fail to push back exec param exprs", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) { + if (OB_ISNULL(startup_exprs.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) { + if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } + } else if (startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) { + bool found = false; + if (!my_exec_params.empty() + && OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), my_exec_params, found))) { + LOG_WARN("fail to check if contain onetime exec param", K(ret)); + } else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("fail to push back non startup expr",K(ret)); + } + } else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) { + LOG_WARN("failed to push back expr", K(ret)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) { + LOG_WARN("failed to add startup exprs", K(ret)); + } else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) { + LOG_WARN("failed to assign exprs", K(ret)); + } + } + } + return ret; +} diff --git a/src/sql/optimizer/ob_log_subplan_filter.h b/src/sql/optimizer/ob_log_subplan_filter.h index 675c74486b..37a5d64468 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.h +++ b/src/sql/optimizer/ob_log_subplan_filter.h @@ -109,6 +109,8 @@ public: int allocate_startup_expr_post() override; + int allocate_startup_expr_post(int64_t child_idx) override; + int allocate_subquery_id(); int replace_nested_subquery_exprs( diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index 23c1539caf..b04504bb24 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -3903,8 +3903,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx) if (OB_ISNULL(startup_exprs.at(i))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpect null expr", K(ret)); - } else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM) || - startup_exprs.at(i)->has_flag(CNT_DYNAMIC_PARAM)) { + } else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) { if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) { LOG_WARN("failed to push back expr", K(ret)); } @@ -3913,7 +3912,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx) } } if (OB_SUCC(ret)) { - if (OB_FAIL(add_startup_exprs(new_startup_exprs))) { + if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) { LOG_WARN("failed to add startup exprs", K(ret)); } else { //exchange out上面的startup filter保留,用于控制当前dfo提前终止 diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index c0da496815..1d13301590 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -1360,7 +1360,7 @@ public: virtual int has_block_parent_for_shj(bool &has_shj); virtual int allocate_startup_expr_post(); - int allocate_startup_expr_post(int64_t child_idx); + virtual int allocate_startup_expr_post(int64_t child_idx); /** * Start plan tree traverse * diff --git a/src/sql/optimizer/ob_optimizer_util.cpp b/src/sql/optimizer/ob_optimizer_util.cpp index 30082533b9..facc4694e6 100644 --- a/src/sql/optimizer/ob_optimizer_util.cpp +++ b/src/sql/optimizer/ob_optimizer_util.cpp @@ -7971,6 +7971,36 @@ int ObOptimizerUtil::expr_calculable_by_exprs(const ObRawExpr *src_expr, return ret; } +int ObOptimizerUtil::check_contain_my_exec_param(ObRawExpr* expr, const common::ObIArray & my_exec_params, bool &contain) +{ + int ret = OB_SUCCESS; + bool is_stack_overflow = false; + contain = false; + if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (OB_FAIL(check_stack_overflow(is_stack_overflow))) { + LOG_WARN("check stack overflow failed", K(ret)); + } else if (is_stack_overflow) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("too deep recursive", K(ret)); + } else if (!expr->has_flag(CNT_DYNAMIC_PARAM)) { + //do nothing + } else if (expr->is_exec_param_expr()) { + const ObExecParamRawExpr *exec_expr = static_cast(expr); + contain = find_exec_param(my_exec_params, exec_expr); + } else if (expr->is_set_op_expr() || expr->is_query_ref_expr() || expr->is_column_ref_expr()) { + //do nothing + } else { + for (int64_t i = 0; !contain && OB_SUCC(ret) && i < expr->get_param_count(); ++i) { + if (OB_FAIL(SMART_CALL(check_contain_my_exec_param(expr->get_param_expr(i), my_exec_params, contain)))) { + LOG_WARN("failed to check contain batch stmt parameter", K(ret)); + } + } + } + return ret; +} + /* get the smallest set from which all exprs can be evaluated */ int ObOptimizerUtil::get_minset_of_exprs(const ObIArray &src_exprs, ObIArray &min_set) { int ret = OB_SUCCESS; @@ -8539,4 +8569,4 @@ int ObOptimizerUtil::replace_gen_column(ObLogPlan *log_plan, ObRawExpr *part_exp } } return ret; -} \ No newline at end of file +} diff --git a/src/sql/optimizer/ob_optimizer_util.h b/src/sql/optimizer/ob_optimizer_util.h index 56c4661c91..62b8f46005 100644 --- a/src/sql/optimizer/ob_optimizer_util.h +++ b/src/sql/optimizer/ob_optimizer_util.h @@ -1433,6 +1433,8 @@ public: ObOptimizerContext &opt_ctx, ObRawExpr *&calc_part_id_expr); + static int check_contain_my_exec_param(ObRawExpr* expr, const common::ObIArray & my_exec_params, bool &contain); + private: //disallow construct ObOptimizerUtil(); diff --git a/tools/deploy/mysql_test/test_suite/static_engine/r/mysql/subplan_filter.result b/tools/deploy/mysql_test/test_suite/static_engine/r/mysql/subplan_filter.result index e90d5eb6d5..b3a9e11061 100644 --- a/tools/deploy/mysql_test/test_suite/static_engine/r/mysql/subplan_filter.result +++ b/tools/deploy/mysql_test/test_suite/static_engine/r/mysql/subplan_filter.result @@ -866,11 +866,11 @@ Outputs & filters: sort_keys([t1.c1, ASC]) 1 - output([t1.c1]), filter(nil), rowset=256 exec_params_(nil), onetime_exprs_([(T_OP_EXISTS, subquery(1))]), init_plan_idxs_(nil), batch_das=false - 2 - output([t1.c1]), filter(nil), rowset=256 + 2 - output([t1.c1]), filter(nil), startup_filter([:0]), rowset=256 3 - output([t1.c1]), filter(nil), rowset=256 dop=2 4 - output([t1.c1]), filter(nil), rowset=256 - 5 - output([t1.c1]), filter(nil), startup_filter([:0]), rowset=256 + 5 - output([t1.c1]), filter(nil), rowset=256 access([t1.c1]), partitions(p[0-4]) is_index_back=false, is_global_index=false, range_key([t1.__pk_increment]), range(MIN ; MAX)always true @@ -920,11 +920,11 @@ Outputs & filters: sort_keys([t1.c1, ASC]) 1 - output([t1.c1]), filter([(T_OP_EXISTS, subquery(2))]), rowset=256 exec_params_([t1.c1]), onetime_exprs_([(T_OP_EXISTS, subquery(1))]), init_plan_idxs_(nil), batch_das=false - 2 - output([t1.c1]), filter(nil), rowset=256 + 2 - output([t1.c1]), filter(nil), startup_filter([:1]), rowset=256 3 - output([t1.c1]), filter(nil), rowset=256 dop=2 4 - output([t1.c1]), filter(nil), rowset=256 - 5 - output([t1.c1]), filter(nil), startup_filter([:1]), rowset=256 + 5 - output([t1.c1]), filter(nil), rowset=256 access([t1.c1]), partitions(p[0-4]) is_index_back=false, is_global_index=false, range_key([t1.__pk_increment]), range(MIN ; MAX)always true