[CP] 修复向量子查询更新的计划生成问题

This commit is contained in:
zzg19950727 2024-10-12 06:01:57 +00:00 committed by ob-robot
parent acc0e6d5da
commit 25ae675abd
9 changed files with 171 additions and 118 deletions

View File

@ -2394,3 +2394,100 @@ int ObDelUpdLogPlan::check_is_direct_load(const ObInsertStmt &insert_stmt, const
}
return ret;
}
int ObDelUpdLogPlan::perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)
{
UNUSED(stmt);
int ret = OB_SUCCESS;
return ret;
}
int ObDelUpdLogPlan::replace_alias_ref_expr(ObRawExpr *&expr, bool &replace_happened)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null expr", K(ret));
} else if (expr->is_alias_ref_expr()) {
ObAliasRefRawExpr *alias = static_cast<ObAliasRefRawExpr *>(expr);
if (OB_UNLIKELY(!alias->is_ref_query_output())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid alias expr", K(ret), K(*alias));
} else {
expr = alias->get_ref_expr();
replace_happened = true;
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(replace_alias_ref_expr(expr->get_param_expr(i), replace_happened))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
}
}
}
return ret;
}
int ObDelUpdLogPlan::candi_allocate_subplan_filter_for_assignments(ObIArray<ObRawExpr*> &assign_exprs)
{
int ret = OB_SUCCESS;
ObSEArray<ObRawExpr*, 4> normal_query_refs;
ObSEArray<ObRawExpr*, 4> alias_query_refs;
for (int64_t i = 0; OB_SUCC(ret) && i < assign_exprs.count(); ++i) {
if (OB_FAIL(extract_assignment_subqueries(assign_exprs.at(i),
normal_query_refs,
alias_query_refs))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (!normal_query_refs.empty() &&
OB_FAIL(candi_allocate_subplan_filter(normal_query_refs, NULL, false))) {
// step. allocate subplan filter for "dml .. set c1 = (select)"
LOG_WARN("failed to allocate subplan", K(ret));
} else if (!alias_query_refs.empty() &&
OB_FAIL(candi_allocate_subplan_filter(alias_query_refs, NULL, true))) {
// step. allocate subplan filter for "dml .. set (a,b)=(select..), (c,d)=(select..)"
LOG_WARN("failed to allocate subplan filter", K(ret));
} else {
LOG_TRACE("succeed to allocate subplan filter for assignment", K(ret),
K(normal_query_refs.count()), K(alias_query_refs.count()));
}
return ret;
}
int ObDelUpdLogPlan::extract_assignment_subqueries(ObRawExpr *expr,
ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr is null", K(ret));
} else if (expr->has_flag(CNT_ONETIME)
|| expr->is_query_ref_expr()
|| T_OP_EXISTS == expr->get_expr_type()
|| T_OP_NOT_EXISTS == expr->get_expr_type()
|| expr->has_flag(IS_WITH_ALL)
|| expr->has_flag(IS_WITH_ANY)) {
if (OB_FAIL(add_var_to_array_no_dup(normal_query_refs, expr))) {
LOG_WARN("failed to add var to array no dup", K(ret));
}
} else if (expr->is_alias_ref_expr()) {
ObAliasRefRawExpr *alias = static_cast<ObAliasRefRawExpr *>(expr);
if (OB_UNLIKELY(!alias->is_ref_query_output())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid alias expr", K(ret), K(*alias));
} else if (OB_FAIL(add_var_to_array_no_dup(alias_query_refs, alias->get_param_expr(0)))) {
LOG_WARN("failed to add var to array with out duplicate", K(ret));
}
} else if (expr->has_flag(CNT_SUB_QUERY)) {
for (int64_t i = 0; OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(SMART_CALL(extract_assignment_subqueries(expr->get_param_expr(i),
normal_query_refs,
alias_query_refs)))) {
LOG_WARN("failed to extract query ref expr", K(ret));
}
}
}
return ret;
}

View File

@ -241,6 +241,7 @@ public:
int check_is_direct_load(const ObInsertStmt &insert_stmt, const int64_t dml_parallel);
int get_parallel_info_from_candidate_plans(int64_t &dop) const;
int get_pdml_parallel_degree(const int64_t target_part_cnt, int64_t &dop) const;
virtual int perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt);
protected:
virtual int generate_normal_raw_plan() override;
@ -248,6 +249,12 @@ protected:
int allocate_optimizer_stats_gathering_as_top(ObLogicalOperator *&old_top,
OSGShareInfo &info,
OSG_TYPE type);
int replace_alias_ref_expr(ObRawExpr *&expr, bool &replace_happened);
int candi_allocate_subplan_filter_for_assignments(ObIArray<ObRawExpr*> &assign_exprs);
int extract_assignment_subqueries(ObRawExpr *expr,
ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs);
private:
DISALLOW_COPY_AND_ASSIGN(ObDelUpdLogPlan);

View File

@ -84,8 +84,8 @@ int ObInsertLogPlan::generate_normal_raw_plan()
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "update values contain non onetime subquery");
LOG_WARN("update values contain non onetime subquery", K(ret));
} else if (!subquery.empty() && OB_FAIL(candi_allocate_subplan_filter(subquery))) {
LOG_WARN("failed to allocate subplan", K(ret));
} else if (OB_FAIL(candi_allocate_subplan_filter_for_assignments(assign_exprs))) {
LOG_WARN("failed to allocate subplan filter for assignments", K(ret));
} else { /*do nothing*/ }
}
@ -1843,3 +1843,25 @@ int ObInsertLogPlan::allocate_select_into_as_top_for_insert(ObLogicalOperator *&
}
return ret;
}
int ObInsertLogPlan::perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* session_info = optimizer_context_.get_session_info();
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("stmt is null", K(ret), K(stmt));
} else {
ObInsertTableInfo &table_info = static_cast<ObInsertStmt*>(stmt)->get_insert_table_info();
for (int64_t i = 0; OB_SUCC(ret) && i < table_info.assignments_.count(); ++i) {
ObRawExpr *value = table_info.assignments_.at(i).expr_;
bool replace_happened = false;
if (OB_FAIL(replace_alias_ref_expr(value, replace_happened))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
} else if (replace_happened && OB_FAIL(value->formalize(session_info))) {
LOG_WARN("failed to formalize expr", K(ret));
}
}
}
return ret;
}

View File

@ -48,6 +48,7 @@ public:
const common::ObIArray<IndexDMLInfo *> &get_insert_up_index_upd_infos() const
{ return insert_up_index_upd_infos_; }
virtual int perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)override;
protected:
int allocate_insert_values_as_top(ObLogicalOperator *&top);
int candi_allocate_insert(OSGShareInfo *osg_info);

View File

@ -10869,13 +10869,15 @@ int ObLogPlan::adjust_final_plan_info(ObLogicalOperator *&op)
LOG_WARN("failed to allocate subquery id", K(ret));
} else if (!subplan_filter->is_update_set()) {
// do nothing
} else if (OB_UNLIKELY(!subplan_filter->get_stmt()->is_update_stmt())) {
} else if (OB_UNLIKELY(!subplan_filter->get_stmt()->is_insert_stmt() &&
!subplan_filter->get_stmt()->is_merge_stmt() &&
!subplan_filter->get_stmt()->is_update_stmt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("update stmt is expected", K(ret));
} else {
ObUpdateLogPlan *plan = static_cast<ObUpdateLogPlan *>(op->get_plan());
ObDelUpdLogPlan *plan = static_cast<ObDelUpdLogPlan *>(op->get_plan());
// stmt is only allowed to be modified in the function;
ObUpdateStmt *stmt = const_cast<ObUpdateStmt* >(plan->get_stmt());
ObDelUpdStmt *stmt = const_cast<ObDelUpdStmt* >(plan->get_stmt());
if (OB_FAIL(plan->perform_vector_assign_expr_replacement(stmt))) {
LOG_WARN("failed to perform vector assgin expr replace", K(ret));
}

View File

@ -337,9 +337,6 @@ int ObMergeLogPlan::candi_allocate_subplan_filter_for_merge()
LOG_WARN("failed to get insert conditions", K(ret));
} else if (OB_FAIL(merge_stmt->get_assignments_exprs(assign_exprs))) {
LOG_WARN("failed to get table assignment", K(ret));
} else if (OB_FAIL(ObOptimizerUtil::get_subquery_exprs(assign_exprs,
target_subquery_exprs))) {
LOG_WARN("failed to get subquery exprs", K(ret));
} else if (OB_FAIL(ObOptimizerUtil::get_subquery_exprs(merge_stmt->get_values_vector(),
target_subquery_exprs))) {
LOG_WARN("failed to get target subquery exprs", K(ret));
@ -349,6 +346,8 @@ int ObMergeLogPlan::candi_allocate_subplan_filter_for_merge()
} else if (!get_subquery_filters().empty() &&
candi_allocate_subplan_filter_for_where()) {
LOG_WARN("failed to allocate subplan filter", K(ret));
} else if (OB_FAIL(candi_allocate_subplan_filter_for_assignments(assign_exprs))) {
LOG_WARN("failed to allocate subplan filter for assignments", K(ret));
} else if (condition_subquery_exprs.empty() && target_subquery_exprs.empty() &&
delete_subquery_exprs.empty()) {
// do nothing
@ -1035,3 +1034,25 @@ int ObMergeLogPlan::check_merge_stmt_need_multi_partition_dml(bool &is_multi_par
}
return ret;
}
int ObMergeLogPlan::perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* session_info = optimizer_context_.get_session_info();
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("stmt is null", K(ret), K(stmt));
} else {
ObMergeTableInfo &table_info = static_cast<ObMergeStmt*>(stmt)->get_merge_table_info();
for (int64_t i = 0; OB_SUCC(ret) && i < table_info.assignments_.count(); ++i) {
ObRawExpr *value = table_info.assignments_.at(i).expr_;
bool replace_happened = false;
if (OB_FAIL(replace_alias_ref_expr(value, replace_happened))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
} else if (replace_happened && OB_FAIL(value->formalize(session_info))) {
LOG_WARN("failed to formalize expr", K(ret));
}
}
}
return ret;
}

View File

@ -40,6 +40,7 @@ public:
ObIArray<ObRawExpr *>& get_delete_condition() { return delete_condition_exprs_; }
const ObIArray<ObRawExpr *>& get_delete_condition() const { return delete_condition_exprs_; }
virtual int prepare_dml_infos() override;
virtual int perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)override;
private:
virtual int generate_normal_raw_plan() override;
int candi_allocate_merge();

View File

@ -167,21 +167,15 @@ int ObUpdateLogPlan::generate_normal_raw_plan()
// 下面针对 assign 部分涉及的查询生成计划,其输出是更新后的值
//
if (OB_SUCC(ret)) {
ObSEArray<ObRawExpr*, 4> normal_query_refs;
ObSEArray<ObRawExpr*, 4> alias_query_refs;
if (OB_FAIL(extract_assignment_subqueries(normal_query_refs, alias_query_refs))) {
LOG_WARN("failed to get assignment subquery exprs", K(ret));
} else if (!normal_query_refs.empty() &&
OB_FAIL(candi_allocate_subplan_filter(normal_query_refs, NULL, false))) {
// step. allocate subplan filter for "update .. set c1 = (select)"
LOG_WARN("failed to allocate subplan", K(ret));
} else if (!alias_query_refs.empty() &&
OB_FAIL(candi_allocate_subplan_filter(alias_query_refs, NULL, true))) {
// step. allocate subplan filter for "update .. set (a,b)=(select..), (c,d)=(select..)"
LOG_WARN("failed to allocate subplan filter", K(ret));
} else {
LOG_TRACE("succeed to allocate subplan filter for assignment", K(ret),
K(normal_query_refs.count()), K(alias_query_refs.count()));
const ObUpdateStmt *update_stmt = get_stmt();
ObSEArray<ObRawExpr*, 8> assign_exprs;
if (OB_ISNULL(update_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret), K(update_stmt));
} else if (OB_FAIL(update_stmt->get_assignments_exprs(assign_exprs))) {
LOG_WARN("failed to get assign exprs", K(ret));
} else if (OB_FAIL(candi_allocate_subplan_filter_for_assignments(assign_exprs))) {
LOG_WARN("failed to allocate subplan filter for assignments", K(ret));
}
}
@ -461,12 +455,12 @@ int ObUpdateLogPlan::candi_allocate_pdml_update()
return ret;
}
int ObUpdateLogPlan::perform_vector_assign_expr_replacement(ObUpdateStmt *stmt)
int ObUpdateLogPlan::perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)
{
int ret = OB_SUCCESS;
ObUpdateTableInfo* table_info = nullptr;
ObSQLSessionInfo* session_info = optimizer_context_.get_session_info();
if (OB_ISNULL(stmt) || OB_ISNULL(table_info = stmt->get_update_table_info().at(0))) {
if (OB_ISNULL(stmt) || OB_ISNULL(table_info = static_cast<ObUpdateStmt*>(stmt)->get_update_table_info().at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("stmt is null", K(ret), K(stmt), K(table_info));
} else {
@ -483,91 +477,6 @@ int ObUpdateLogPlan::perform_vector_assign_expr_replacement(ObUpdateStmt *stmt)
return ret;
}
int ObUpdateLogPlan::replace_alias_ref_expr(ObRawExpr *&expr, bool &replace_happened)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null expr", K(ret));
} else if (expr->is_alias_ref_expr()) {
ObAliasRefRawExpr *alias = static_cast<ObAliasRefRawExpr *>(expr);
if (OB_UNLIKELY(!alias->is_ref_query_output())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid alias expr", K(ret), K(*alias));
} else {
expr = alias->get_ref_expr();
replace_happened = true;
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(replace_alias_ref_expr(expr->get_param_expr(i), replace_happened))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
}
}
}
return ret;
}
int ObUpdateLogPlan::extract_assignment_subqueries(ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs)
{
int ret = OB_SUCCESS;
const ObUpdateStmt *update_stmt = get_stmt();
ObSEArray<ObRawExpr*, 8> assign_exprs;
if (OB_ISNULL(update_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret), K(update_stmt));
} else if (OB_FAIL(update_stmt->get_assignments_exprs(assign_exprs))) {
LOG_WARN("failed to get assign exprs", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < assign_exprs.count(); ++i) {
if (OB_FAIL(extract_assignment_subqueries(assign_exprs.at(i),
normal_query_refs,
alias_query_refs))) {
LOG_WARN("failed to replace alias ref expr", K(ret));
}
}
}
return ret;
}
int ObUpdateLogPlan::extract_assignment_subqueries(ObRawExpr *expr,
ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr is null", K(ret));
} else if (expr->has_flag(CNT_ONETIME)
|| expr->is_query_ref_expr()
|| T_OP_EXISTS == expr->get_expr_type()
|| T_OP_NOT_EXISTS == expr->get_expr_type()
|| expr->has_flag(IS_WITH_ALL)
|| expr->has_flag(IS_WITH_ANY)) {
if (OB_FAIL(add_var_to_array_no_dup(normal_query_refs, expr))) {
LOG_WARN("failed to add var to array no dup", K(ret));
}
} else if (expr->is_alias_ref_expr()) {
ObAliasRefRawExpr *alias = static_cast<ObAliasRefRawExpr *>(expr);
if (OB_UNLIKELY(!alias->is_ref_query_output())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid alias expr", K(ret), K(*alias));
} else if (OB_FAIL(add_var_to_array_no_dup(alias_query_refs, alias->get_param_expr(0)))) {
LOG_WARN("failed to add var to array with out duplicate", K(ret));
}
} else if (expr->has_flag(CNT_SUB_QUERY)) {
for (int64_t i = 0; OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(SMART_CALL(extract_assignment_subqueries(expr->get_param_expr(i),
normal_query_refs,
alias_query_refs)))) {
LOG_WARN("failed to extract query ref expr", K(ret));
}
}
}
return ret;
}
int ObUpdateLogPlan::prepare_dml_infos()
{
int ret = OB_SUCCESS;

View File

@ -31,7 +31,7 @@ namespace sql
const ObUpdateStmt *get_stmt() const override
{ return reinterpret_cast<const ObUpdateStmt*>(stmt_); }
int perform_vector_assign_expr_replacement(ObUpdateStmt *stmt);
virtual int perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt)override;
protected:
virtual int generate_normal_raw_plan() override;
@ -47,13 +47,6 @@ namespace sql
int allocate_update_as_top(ObLogicalOperator *&top,
ObConstRawExpr *lock_row_flag_expr,
bool is_multi_part_dml);
int replace_alias_ref_expr(ObRawExpr *&expr, bool &replace_happened);
int extract_assignment_subqueries(ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs);
int extract_assignment_subqueries(ObRawExpr *expr,
ObIArray<ObRawExpr*> &normal_query_refs,
ObIArray<ObRawExpr*> &alias_query_refs);
virtual int prepare_dml_infos() override;
virtual int prepare_table_dml_info_special(const ObDmlTableInfo& table_info,
IndexDMLInfo* table_dml_info,