From 59591c9ea272ad64d67d12287e25a1dfdbe15e21 Mon Sep 17 00:00:00 2001 From: hanr881 <1741282579@qq.com> Date: Tue, 23 May 2023 05:17:26 +0000 Subject: [PATCH] to issue<49845698>:fix pdml with udf report 6220 --- src/pl/ob_pl.cpp | 21 ++++++++++++--------- src/pl/ob_pl.h | 9 +++++++-- src/pl/ob_pl_code_generator.cpp | 2 ++ src/pl/ob_pl_resolver.cpp | 4 +++- src/pl/ob_pl_stmt.h | 5 +++++ src/sql/ob_sql_context.h | 5 ++++- src/sql/optimizer/ob_optimizer.cpp | 5 +++++ src/sql/resolver/dml/ob_dml_resolver.cpp | 12 +----------- src/sql/resolver/ob_resolver_utils.cpp | 13 ++++++++++--- src/sql/resolver/ob_resolver_utils.h | 2 +- 10 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/pl/ob_pl.cpp b/src/pl/ob_pl.cpp index f1d9f7464d..321dd68ef6 100644 --- a/src/pl/ob_pl.cpp +++ b/src/pl/ob_pl.cpp @@ -475,15 +475,16 @@ void ObPLContext::register_after_begin_autonomous_session_for_deadlock_(ObSQLSes int ObPLContext::init(ObSQLSessionInfo &session_info, ObExecContext &ctx, - bool is_autonomous, + ObPLFunction *routine, bool is_function_or_trigger, ObIAllocator *allocator) { int ret = OB_SUCCESS; int64_t pl_block_timeout = 0; int64_t query_start_time = session_info.get_query_start_time(); - - OX (is_autonomous_ = is_autonomous); + CK (OB_NOT_NULL(routine)); + OX (is_function_or_trigger |= routine->is_function()); + OX (is_autonomous_ = routine->is_autonomous()); OX (is_function_or_trigger_ = is_function_or_trigger); OZ (session_info.get_pl_block_timeout(pl_block_timeout)); @@ -579,7 +580,10 @@ int ObPLContext::init(ObSQLSessionInfo &session_info, } } - if (is_function_or_trigger && lib::is_mysql_mode()) { + if (OB_SUCC(ret) && is_function_or_trigger && lib::is_mysql_mode() && + routine->get_has_parallel_affect_factor()) { + // 并行场景下不能创建stash savepoint, 只有当udf/trigger内部有tcl语句时, stash savepoint才有意义 + // udf内部有tcl语句时,该标记为true last_insert_id_ = session_info.get_local_last_insert_id(); const ObString stash_savepoint_name("PL stash savepoint"); OZ (ObSqlTransControl::create_stash_savepoint(ctx, stash_savepoint_name)); @@ -1546,7 +1550,7 @@ int ObPL::execute(ObExecContext &ctx, const ObStmtNodeTree *block) if (OB_SUCC(ret)) { SMART_VAR(ObPLContext, stack_ctx) { LinkPLStackGuard link_stack_guard(ctx, stack_ctx); - OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, routine->is_autonomous(), false)); + OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, routine, false)); try { // execute it. @@ -1650,7 +1654,7 @@ int ObPL::execute(ObExecContext &ctx, if (OB_SUCC(ret)) { SMART_VAR(ObPLContext, stack_ctx) { LinkPLStackGuard link_stack_guard(ctx, stack_ctx); - OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, routine->is_autonomous(), false)); + OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, routine, false)); try { // execute it... @@ -1791,9 +1795,8 @@ int ObPL::execute(ObExecContext &ctx, } // prepare it ... OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, - routine->is_autonomous(), - routine->is_function() - || in_function + routine, + in_function || (package_id != OB_INVALID_ID && ObTriggerInfo::is_trigger_package_id(package_id)), &allocator)); diff --git a/src/pl/ob_pl.h b/src/pl/ob_pl.h index 4495b39b2a..81f6010a96 100644 --- a/src/pl/ob_pl.h +++ b/src/pl/ob_pl.h @@ -448,7 +448,8 @@ public: is_invoker_right_(false), is_pipelined_(false), name_debuginfo_(), - function_name_() { } + function_name_(), + has_parallel_affect_factor_(false) { } virtual ~ObPLFunction(); inline const common::ObIArray &get_variables() const { return variables_; } @@ -490,6 +491,9 @@ public: inline bool is_pipelined() const { return is_pipelined_; } inline void set_pipelined(bool is_pipelined) { is_pipelined_ = is_pipelined; } + inline bool get_has_parallel_affect_factor() const { return has_parallel_affect_factor_; } + inline void set_has_parallel_affect_factor(bool value) { has_parallel_affect_factor_ = value; } + inline const PLCacheObjStat get_stat() const { return stat_; } inline PLCacheObjStat &get_stat_for_update() { return stat_; } @@ -576,6 +580,7 @@ private: common::ObString package_name_; common::ObString database_name_; common::ObString priv_user_; + bool has_parallel_affect_factor_; DISALLOW_COPY_AND_ASSIGN(ObPLFunction); }; @@ -875,7 +880,7 @@ public: int is_inited() { return session_info_ != NULL; } int init(sql::ObSQLSessionInfo &session_info, sql::ObExecContext &ctx, - bool is_autonomous, bool is_function_or_trigger, ObIAllocator *allocator = NULL); + ObPLFunction *routine, bool is_function_or_trigger, ObIAllocator *allocator = NULL); void destory(sql::ObSQLSessionInfo &session_info, sql::ObExecContext &ctx, int &ret); inline ObPLCursorInfo& get_cursor_info() { return cursor_info_; } diff --git a/src/pl/ob_pl_code_generator.cpp b/src/pl/ob_pl_code_generator.cpp index 2cf37c92dc..d105f62634 100644 --- a/src/pl/ob_pl_code_generator.cpp +++ b/src/pl/ob_pl_code_generator.cpp @@ -7978,6 +7978,7 @@ int ObPLCodeGenerator::generate_simple(ObPLFunction &pl_func) OX (pl_func.set_action((uint64_t)(&ObPL::simple_execute))); OX (pl_func.set_can_cached(get_ast().get_can_cached())); OX (pl_func.set_is_all_sql_stmt(get_ast().get_is_all_sql_stmt())); + OX (pl_func.set_has_parallel_affect_factor(get_ast().has_parallel_affect_factor())); OX (sql_infos.set_capacity(static_cast(ast.get_sql_stmts().count()))); for (int64_t i = 0; OB_SUCC(ret) && i < ast.get_sql_stmts().count(); ++i) { @@ -8098,6 +8099,7 @@ int ObPLCodeGenerator::generate_normal(ObPLFunction &pl_func) pl_func.set_action(helper_.get_function_address(get_ast().get_name())); pl_func.set_can_cached(get_ast().get_can_cached()); pl_func.set_is_all_sql_stmt(get_ast().get_is_all_sql_stmt()); + pl_func.set_has_parallel_affect_factor(get_ast().has_parallel_affect_factor()); } } if (debug_mode_) { diff --git a/src/pl/ob_pl_resolver.cpp b/src/pl/ob_pl_resolver.cpp index 87a50e2bd9..4e3dd3aa20 100644 --- a/src/pl/ob_pl_resolver.cpp +++ b/src/pl/ob_pl_resolver.cpp @@ -4649,7 +4649,9 @@ int ObPLResolver::resolve_static_sql(const ObStmtNodeTree *parse_tree, ObPLSql & if (!func.is_modifies_sql_data()) { func.set_reads_sql_data(); } - } else if (ObStmt::is_dml_write_stmt(prepare_result.type_)) { + } else if (ObStmt::is_dml_write_stmt(prepare_result.type_) || + ObStmt::is_savepoint_stmt(prepare_result.type_) || + ObStmt::is_tcl_stmt(prepare_result.type_)) { func.set_modifies_sql_data(); } else if (!func.is_reads_sql_data() && !func.is_modifies_sql_data()) { func.set_contains_sql(); diff --git a/src/pl/ob_pl_stmt.h b/src/pl/ob_pl_stmt.h index 4303db0ede..95a13426db 100644 --- a/src/pl/ob_pl_stmt.h +++ b/src/pl/ob_pl_stmt.h @@ -1719,6 +1719,11 @@ public: inline int add_subprogram_path(int64_t path) { return subprogram_path_.push_back(path); } inline bool get_is_all_sql_stmt() const { return is_all_sql_stmt_; } inline void set_is_all_sql_stmt(bool is_all_sql_stmt) { is_all_sql_stmt_ = is_all_sql_stmt; } + inline bool has_parallel_affect_factor() const + { + return is_reads_sql_data() || is_modifies_sql_data() || is_wps() || + is_rps() || is_has_sequence() || is_external_state(); + } int add_argument(const common::ObString &name, const ObPLDataType &type, const sql::ObRawExpr *expr = NULL, const common::ObIArray *type_info = NULL, diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 110fc2a996..a5190ca299 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -580,7 +580,8 @@ public: tz_info_(NULL), res_map_rule_id_(common::OB_INVALID_ID), res_map_rule_param_idx_(common::OB_INVALID_INDEX), - root_stmt_(NULL) + root_stmt_(NULL), + udf_has_select_stmt_(false) { } TO_STRING_KV(N_PARAM_NUM, question_marks_count_, @@ -620,6 +621,7 @@ public: res_map_rule_id_ = common::OB_INVALID_ID; res_map_rule_param_idx_ = common::OB_INVALID_INDEX; root_stmt_ = NULL; + udf_has_select_stmt_ = false; } int64_t get_new_stmt_id() { return stmt_count_++; } @@ -695,6 +697,7 @@ public: uint64_t res_map_rule_id_; int64_t res_map_rule_param_idx_; ObDMLStmt *root_stmt_; + bool udf_has_select_stmt_; // udf has select stmt, not contain other dml stmt }; } /* ns sql*/ } /* ns oceanbase */ diff --git a/src/sql/optimizer/ob_optimizer.cpp b/src/sql/optimizer/ob_optimizer.cpp index 9bcf4cf736..4daff5628c 100644 --- a/src/sql/optimizer/ob_optimizer.cpp +++ b/src/sql/optimizer/ob_optimizer.cpp @@ -660,6 +660,11 @@ int ObOptimizer::check_whether_contain_nested_sql(const ObDMLStmt &stmt) { int ret = OB_SUCCESS; const ObDelUpdStmt *del_upd_stmt = nullptr; + // dml + select need run das path + if (!stmt.get_query_ctx()->disable_udf_parallel_ && stmt.get_query_ctx()->udf_has_select_stmt_) { + stmt.get_query_ctx()->disable_udf_parallel_ |= ((stmt.is_select_stmt() && stmt.has_for_update()) + || (stmt.is_dml_write_stmt())); + } if (stmt.get_query_ctx()->disable_udf_parallel_) { ctx_.set_has_pl_udf(true); } diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index aaec973226..99f179dbca 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -10825,20 +10825,10 @@ int ObDMLResolver::resolve_external_name(ObQualifiedName &q_name, OX (expr->set_is_called_in_sql(true)); } - bool is_dml_stmt = false; - if (OB_FAIL(ret)) { - } else if (ObStmt::is_dml_write_stmt(stmt->get_stmt_type())) { - is_dml_stmt = true; - } else if (stmt::T_SELECT == stmt->get_stmt_type()) { - bool has_for_update = false; - OZ (stmt->check_if_contain_select_for_update(has_for_update)); - OX (is_dml_stmt = has_for_update); - } - OZ (ObResolverUtils::set_parallel_info(*params_.session_info_, *params_.schema_checker_->get_schema_guard(), *expr, - is_dml_stmt)); + stmt_->get_query_ctx()->udf_has_select_stmt_)); OX (stmt_->get_query_ctx()->disable_udf_parallel_ |= !udf_expr->is_parallel_enable()); if (OB_SUCC(ret) && udf_expr->get_result_type().is_ext() && diff --git a/src/sql/resolver/ob_resolver_utils.cpp b/src/sql/resolver/ob_resolver_utils.cpp index dc59e01e5d..25e48266b9 100644 --- a/src/sql/resolver/ob_resolver_utils.cpp +++ b/src/sql/resolver/ob_resolver_utils.cpp @@ -6422,10 +6422,16 @@ int ObResolverUtils::resolve_string(const ParseNode *node, ObString &string) return ret; } +// judge whether pdml stmt contain udf can parallel execute or not has two stage: +// stage1:check has dml write stmt or read/write package var info in this funciton; +// stage2:record udf has select stmt info, and when optimize this stmt, +// according outer stmt type to determine, if udf has select stmt: +// case1: if outer stmt is select, can paralllel +// case2: if outer stmt is dml write stmt, forbid parallel int ObResolverUtils::set_parallel_info(sql::ObSQLSessionInfo &session_info, share::schema::ObSchemaGetterGuard &schema_guard, ObRawExpr &expr, - bool is_dml_stmt) + bool &contain_select_stmt) { int ret = OB_SUCCESS; const ObRoutineInfo *routine_info = NULL; @@ -6455,8 +6461,9 @@ int ObResolverUtils::set_parallel_info(sql::ObSQLSessionInfo &session_info, routine_info->is_has_sequence() || routine_info->is_external_state()) { enable_parallel = false; - } else if (is_dml_stmt && routine_info->is_reads_sql_data()) { - enable_parallel = false; + } + if (routine_info->is_reads_sql_data()) { + contain_select_stmt = true; } OX (udf_raw_expr.set_parallel_enable(enable_parallel)); } diff --git a/src/sql/resolver/ob_resolver_utils.h b/src/sql/resolver/ob_resolver_utils.h index 8113b1ba28..766f2986b9 100644 --- a/src/sql/resolver/ob_resolver_utils.h +++ b/src/sql/resolver/ob_resolver_utils.h @@ -284,7 +284,7 @@ public: static int set_parallel_info(sql::ObSQLSessionInfo &session_info, share::schema::ObSchemaGetterGuard &schema_guard, ObRawExpr &expr, - bool is_dml_stmt); + bool &contain_select_stmt); static int resolve_external_symbol(common::ObIAllocator &allocator, sql::ObRawExprFactory &expr_factory,