From 4453ef5eee6db9b737df9268fc6fd9fa2e5c1300 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 4 Jan 2023 08:08:15 +0000 Subject: [PATCH] Fix some query rewrite bug --- src/pl/CMakeLists.txt | 1 + src/pl/ob_pl_interface_pragma.h | 7 + .../sys_package/ob_dbms_user_define_rule.cpp | 371 ++++++++++++++++++ src/sql/ob_sql.cpp | 3 +- src/sql/ob_sql_context.cpp | 3 + src/sql/ob_sql_context.h | 1 + src/sql/parser/sql_parser_mysql_mode.l | 2 - 7 files changed, 385 insertions(+), 3 deletions(-) create mode 100644 src/pl/sys_package/ob_dbms_user_define_rule.cpp diff --git a/src/pl/CMakeLists.txt b/src/pl/CMakeLists.txt index 1f19623e05..4666d8abd9 100644 --- a/src/pl/CMakeLists.txt +++ b/src/pl/CMakeLists.txt @@ -45,6 +45,7 @@ ob_set_subtarget(ob_pl sys_package sys_package/ob_dbms_monitor.cpp sys_package/ob_dbms_upgrade.cpp sys_package/ob_dbms_sql.cpp + sys_package/ob_dbms_user_define_rule.cpp ) ob_server_add_target(ob_pl) diff --git a/src/pl/ob_pl_interface_pragma.h b/src/pl/ob_pl_interface_pragma.h index 0f06d93413..afe061445e 100644 --- a/src/pl/ob_pl_interface_pragma.h +++ b/src/pl/ob_pl_interface_pragma.h @@ -109,6 +109,13 @@ #undef DEFINE_DBMS_SCHEDULER_MYSQL_INTERFACE //end of dbms_scheduler_mysql + + // start of dbms_udr + INTERFACE_DEF(INTERFACE_DBMS_UDR_CREATE_RULE, "CREATE_RULE", (void *)(ObDBMSUserDefineRule::create_rule)) + INTERFACE_DEF(INTERFACE_DBMS_UDR_REMOVE_RULE, "REMOVE_RULE", (void *)(ObDBMSUserDefineRule::remove_rule)) + INTERFACE_DEF(INTERFACE_DBMS_UDR_ENABLE_RULE, "ENABLE_RULE", (void *)(ObDBMSUserDefineRule::enable_rule)) + INTERFACE_DEF(INTERFACE_DBMS_UDR_DISABLE_RULE, "DISABLE_RULE", (void *)(ObDBMSUserDefineRule::disable_rule)) + // end of dbms_udr /****************************************************************************/ INTERFACE_DEF(INTERFACE_END, "INVALID", (void*)(NULL)) diff --git a/src/pl/sys_package/ob_dbms_user_define_rule.cpp b/src/pl/sys_package/ob_dbms_user_define_rule.cpp new file mode 100644 index 0000000000..d0b90a609a --- /dev/null +++ b/src/pl/sys_package/ob_dbms_user_define_rule.cpp @@ -0,0 +1,371 @@ +// Copyright 2015-2016 Alibaba Inc. All Rights Reserved. +// Author: +// LuoFan luofan.zp@alibaba-inc.com +// Normalizer: +// LuoFan luofan.zp@alibaba-inc.com + +#define USING_LOG_PREFIX PL + +#include "pl/ob_pl_package_manager.h" +#include "share/ob_errno.h" +#include "lib/utility/ob_macro_utils.h" +#include "lib/oblog/ob_log_module.h" +#include "sql/executor/ob_task_executor_ctx.h" +#include "sql/parser/ob_parser.h" +#include "sql/resolver/ob_resolver_utils.h" +#include "sql/engine/ob_exec_context.h" +#include "sql/udr/ob_udr_analyzer.h" +#include "sql/udr/ob_udr_mgr.h" +#include "observer/ob_server_struct.h" +#include "pl/sys_package/ob_dbms_user_define_rule.h" + +namespace oceanbase +{ +using namespace obrpc; +namespace pl +{ + +#define DEF_UDR_FUNC_IMPL(func_name, Processor) \ + int ObDBMSUserDefineRule::func_name(sql::ObExecContext &ctx, \ + sql::ParamStore ¶ms, \ + common::ObObj &result) \ + { \ + int ret = OB_SUCCESS; \ + Processor processor(ctx, params, result); \ + if (OB_FAIL(processor.init())) { \ + LOG_WARN("processor init failed", K(ret)); \ + } else if (OB_FAIL(processor.process())) { \ + LOG_WARN("failed to process", K(ret)); \ + } \ + return ret; \ + } + +int ObUDRProcessor::init() +{ + int ret = OB_SUCCESS; + is_inited_ = true; + return ret; +} + +int ObUDRProcessor::pre_execution_check() +{ + int ret = OB_SUCCESS; + if (arg_.rule_name_.empty()) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "empty rule name"); + LOG_WARN("empty rule name not supported", K(ret)); + } else if (arg_.rule_name_.length() > OB_MAX_ORIGINAL_NANE_LENGTH) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "rule name length exceeding 256 is"); + LOG_WARN("rule name length exceeding 256 is not supported", K(ret)); + } + return ret; +} + +int ObUDRProcessor::sync_rule_from_inner_table() +{ + int ret = OB_SUCCESS; + ObSyncRewriteRuleArg sync_udr_arg; + sql::ObBasicSessionInfo *session = NULL; + sql::ObTaskExecutorCtx *task_exec_ctx = NULL; + obrpc::ObCommonRpcProxy *common_rpc = NULL; + if (OB_ISNULL(session = ctx_.get_my_session()) || + OB_ISNULL(task_exec_ctx = ctx_.get_task_executor_ctx()) || + OB_ISNULL(common_rpc = task_exec_ctx->get_common_rpc())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret), K(task_exec_ctx), K(common_rpc)); + } else { + sync_udr_arg.tenant_id_ = session->get_effective_tenant_id(); + if (OB_FAIL(common_rpc->by(sync_udr_arg.tenant_id_) + .as(sync_udr_arg.tenant_id_) + .admin_sync_rewrite_rules(sync_udr_arg))) { + LOG_WARN("sync rewrite rules rpc failed ", K(ret), "rpc_arg", sync_udr_arg); + } + } + return ret; +} + +int ObUDRProcessor::process() +{ + int ret = OB_SUCCESS; + uint64_t data_version = 0; + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("processor should be inited first", K(ret)); + } else if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(session->get_effective_tenant_id(), data_version))) { + LOG_WARN("failed to get min data version", K(ret)); + } else if (data_version < DATA_VERSION_4_1_0_0) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "user-defined_rules with minimal data version < 4100"); + LOG_WARN("user-defined_rules with minimal data version < 4100", K(ret)); + } else if (OB_FAIL(parse_request_param())) { + LOG_WARN("failed to parse request param", K(ret)); + } else if (OB_FAIL(generate_exec_arg())) { + LOG_WARN("failed to generate exec arg", K(ret)); + } else if (OB_FAIL(pre_execution_check())) { + LOG_WARN("failed to pre execution check", K(ret)); + } else if (OB_FAIL(execute())) { + LOG_WARN("failed to execute rpc", K(ret)); + } else if (OB_FAIL(sync_rule_from_inner_table())) { + LOG_WARN("failed to sync rule from inner table", K(ret)); + } + return ret; +} + +/** + * PROCEDURE CREATE_RULE ( + * rule_name VARCHAR(256), + * database_name VARCHAR(128), + * pattern LONGTEXT, + * replacement LONGTEXT, + * enabled IN VARCHAR := 'YES' + *); + */ +int ObCreateRuleProcessor::parse_request_param() +{ + int ret = OB_SUCCESS; + ObString enabled; + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + LOG_DEBUG("parse request param", K(params_)); + if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if ((lib::is_oracle_mode() && 4 != params_.count()) || + (!lib::is_oracle_mode() && 5 != params_.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid params", K(params_.count())); + } else if (lib::is_oracle_mode()) { + OV (params_.at(0).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(0).get_string(arg_.rule_name_)); + OV (params_.at(1).is_lob_locator(), OB_INVALID_ARGUMENT); + OX (arg_.pattern_.assign_ptr(params_.at(1).get_lob_locator()->get_payload_ptr(), + static_cast(params_.at(1).get_lob_locator()->get_payload_length()))); + OV (params_.at(2).is_lob_locator(), OB_INVALID_ARGUMENT); + OX (arg_.replacement_.assign_ptr(params_.at(2).get_lob_locator()->get_payload_ptr(), + static_cast(params_.at(2).get_lob_locator()->get_payload_length()))); + OV (params_.at(3).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(3).get_string(enabled)); + OX (arg_.db_name_ = session->get_database_name()); + } else { + OV (params_.at(0).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(0).get_string(arg_.rule_name_)); + OV (params_.at(1).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(1).get_string(arg_.db_name_)); + OV (params_.at(2).is_text(), OB_INVALID_ARGUMENT); + OZ (params_.at(2).get_string(arg_.pattern_)); + OV (params_.at(3).is_text(), OB_INVALID_ARGUMENT); + OZ (params_.at(3).get_string(arg_.replacement_)); + OV (params_.at(4).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(4).get_string(enabled)); + } + if (OB_SUCC(ret)) { + arg_.tenant_id_ = session->get_effective_tenant_id(); + arg_.coll_type_ = session->get_local_collation_connection(); + if (0 == enabled.case_compare("yes")) { + arg_.rule_status_ = ObUDRInfo::ENABLE_STATUS; + } else if (0 == enabled.case_compare("no")) { + arg_.rule_status_ = ObUDRInfo::DISABLE_STATUS; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid fixed param", K(enabled), K(ret)); + } + } + return ret; +} + +int ObCreateRuleProcessor::generate_exec_arg() +{ +#define ISSPACE(c) ((c) == ' ' || (c) == '\n' || (c) == '\r' || (c) == '\t' || (c) == '\f' || (c) == '\v') + int ret = OB_SUCCESS; + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else { + ObUDRAnalyzer analyzer(ctx_.get_allocator(), + session->get_sql_mode(), + session->get_local_collation_connection()); + if (OB_FAIL(analyzer.parse_and_check(arg_.pattern_, arg_.replacement_))) { + LOG_WARN("failed to parse and check", K(ret)); + } else if (OB_FAIL(analyzer.parse_pattern_to_gen_param_infos_str( + arg_.pattern_, + arg_.normalized_pattern_, + arg_.fixed_param_infos_str_, + arg_.dynamic_param_infos_str_, + arg_.question_mark_ctx_str_))) { + LOG_WARN("failed to parse to gen param infos", K(ret), K(arg_.pattern_)); + } else { + const ObString stmt(arg_.normalized_pattern_.length(), arg_.normalized_pattern_.ptr()); + int64_t len = stmt.length(); + while (len > 0 && ISSPACE(stmt[len - 1])) { + --len; + } + if (';' == stmt[len - 1]) { + --len; + } + arg_.normalized_pattern_.assign_ptr(stmt.ptr(), len); + arg_.pattern_digest_ = arg_.normalized_pattern_.hash(); + } + } + if (OB_SUCC(ret)) { + LOG_DEBUG("succ generate exec arg", K(arg_)); + } + return ret; +} + +int ObCreateRuleProcessor::execute() +{ + int ret = OB_SUCCESS; + sql::ObUDRMgr *rule_mgr = MTL(sql::ObUDRMgr*); + if (OB_FAIL(rule_mgr->insert_rule(arg_))) { + LOG_WARN("failed to insert rewrite rule", K(ret), K(arg_)); + } + return ret; +} + +/** + * PROCEDURE REMOVE_RULE ( + * rule_name VARCHAR(256) + *); + */ +int ObRemoveRuleProcessor::parse_request_param() +{ + int ret = OB_SUCCESS; + bool is_enabled = false; + LOG_DEBUG("parse request param", K(params_)); + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if (1 != params_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid params", K(params_.count())); + } else { + OV (params_.at(0).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(0).get_string(arg_.rule_name_)); + } + if (OB_SUCC(ret)) { + arg_.tenant_id_ = session->get_effective_tenant_id(); + } + return ret; +} + +int ObRemoveRuleProcessor::generate_exec_arg() +{ + int ret = OB_SUCCESS; + arg_.rule_status_ = ObUDRInfo::DELETE_STATUS; + LOG_DEBUG("succ generate exec arg", K(arg_)); + return ret; +} + +int ObRemoveRuleProcessor::execute() +{ + int ret = OB_SUCCESS; + sql::ObUDRMgr *rule_mgr = MTL(sql::ObUDRMgr*); + if (OB_FAIL(rule_mgr->remove_rule(arg_))) { + LOG_WARN("failed to remove rewrite rule", K(ret), K(arg_)); + } + return ret; +} + +/** + * PROCEDURE ENABLE_RULE ( + * rule_name VARCHAR(256) + *); + */ +int ObEnableRuleProcessor::parse_request_param() +{ + int ret = OB_SUCCESS; + bool is_enabled = false; + LOG_DEBUG("parse request param", K(params_)); + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if (1 != params_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid params", K(params_.count())); + } else { + OV (params_.at(0).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(0).get_string(arg_.rule_name_)); + } + if (OB_SUCC(ret)) { + arg_.tenant_id_ = session->get_effective_tenant_id(); + } + return ret; +} + +int ObEnableRuleProcessor::generate_exec_arg() +{ + int ret = OB_SUCCESS; + arg_.rule_status_ = ObUDRInfo::ENABLE_STATUS; + LOG_DEBUG("succ generate exec arg", K(arg_)); + return ret; +} + +int ObEnableRuleProcessor::execute() +{ + int ret = OB_SUCCESS; + sql::ObUDRMgr *rule_mgr = MTL(sql::ObUDRMgr*); + if (OB_FAIL(rule_mgr->alter_rule_status(arg_))) { + LOG_WARN("failed to alter rewrite rule status", K(ret), K(arg_)); + } + return ret; +} + +/** + * PROCEDURE DISABLED_RULE ( + * rule_name VARCHAR(256) + *); + */ +int ObDisableRuleProcessor::parse_request_param() +{ + int ret = OB_SUCCESS; + bool is_enabled = false; + LOG_DEBUG("parse request param", K(params_)); + sql::ObSQLSessionInfo *session = ctx_.get_my_session(); + if (OB_ISNULL(session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else if (1 != params_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid params", K(params_.count())); + } else { + OV (params_.at(0).is_varchar(), OB_INVALID_ARGUMENT); + OZ (params_.at(0).get_string(arg_.rule_name_)); + } + if (OB_SUCC(ret)) { + arg_.tenant_id_ = session->get_effective_tenant_id(); + } + return ret; +} + +int ObDisableRuleProcessor::generate_exec_arg() +{ + int ret = OB_SUCCESS; + arg_.rule_status_ = ObUDRInfo::DISABLE_STATUS; + LOG_DEBUG("succ generate exec arg", K(arg_)); + return ret; +} + +int ObDisableRuleProcessor::execute() +{ + int ret = OB_SUCCESS; + sql::ObUDRMgr *rule_mgr = MTL(sql::ObUDRMgr*); + if (OB_FAIL(rule_mgr->alter_rule_status(arg_))) { + LOG_WARN("failed to alter rewrite rule status", K(ret), K(arg_)); + } + return ret; +} + +DEF_UDR_FUNC_IMPL(create_rule, ObCreateRuleProcessor); +DEF_UDR_FUNC_IMPL(remove_rule, ObRemoveRuleProcessor); +DEF_UDR_FUNC_IMPL(enable_rule, ObEnableRuleProcessor); +DEF_UDR_FUNC_IMPL(disable_rule, ObDisableRuleProcessor); + +} // end of pl +} // end of oceanbase diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 0462ae7eca..944d224072 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -1921,7 +1921,7 @@ OB_INLINE int ObSql::handle_text_query(const ObString &stmt, ObSqlCtx &context, // if get plan from plan cache, reset its auto-increment variable here // do not set variable for hidden primary key; its default value is 1 if (OB_SUCC(ret)) { - if (!context.is_prepare_protocol_ + if (!context.is_text_ps_mode_ && OB_FAIL(after_get_plan(*pc_ctx, session, result.get_physical_plan(), result.get_is_from_plan_cache(), NULL))) { LOG_WARN("fail to handle after get plan", K(ret)); @@ -4264,6 +4264,7 @@ int ObSql::handle_text_execute(const ObStmt *basic_stmt, { int ret = OB_SUCCESS; const ObExecuteStmt *exec_stmt = static_cast(basic_stmt); + sql_ctx.is_text_ps_mode_ = true; if (OB_ISNULL(exec_stmt)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("stmt is NULL", K(ret), KPC(exec_stmt), KPC(basic_stmt)); diff --git a/src/sql/ob_sql_context.cpp b/src/sql/ob_sql_context.cpp index fb7014edec..f34b677390 100644 --- a/src/sql/ob_sql_context.cpp +++ b/src/sql/ob_sql_context.cpp @@ -233,6 +233,7 @@ ObSqlCtx::ObSqlCtx() is_protocol_weak_read_(false), flashback_query_expr_(nullptr), is_execute_call_stmt_(false), + is_text_ps_mode_(false), reroute_info_(nullptr) { sql_id_[0] = '\0'; @@ -279,6 +280,7 @@ void ObSqlCtx::reset() stmt_type_ = stmt::T_NONE; cur_plan_ = nullptr; is_execute_call_stmt_ = false; + is_text_ps_mode_ = false; } //release dynamic allocated memory @@ -292,6 +294,7 @@ void ObSqlCtx::clear() multi_stmt_rowkey_pos_.reset(); spm_ctx_.bl_key_.reset(); cur_stmt_ = nullptr; + is_text_ps_mode_ = false; } OB_SERIALIZE_MEMBER(ObSqlCtx, stmt_type_); diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 0933de3e17..d041717387 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -500,6 +500,7 @@ public: ObRawExpr *flashback_query_expr_; ObSpmCacheCtx spm_ctx_; bool is_execute_call_stmt_; + bool is_text_ps_mode_; private: share::ObFeedbackRerouteInfo *reroute_info_; }; diff --git a/src/sql/parser/sql_parser_mysql_mode.l b/src/sql/parser/sql_parser_mysql_mode.l index cdbdae036c..83b063a3ac 100644 --- a/src/sql/parser/sql_parser_mysql_mode.l +++ b/src/sql/parser/sql_parser_mysql_mode.l @@ -1348,7 +1348,6 @@ BEGIN(in_c_comment); return QUESTIONMARK; } } - ":"{identifier} { ParseResult *p = (ParseResult *)yyextra; check_value(yylval); @@ -1365,7 +1364,6 @@ BEGIN(in_c_comment); if (OB_UNLIKELY(p->question_mark_ctx_.by_ordinal_)) { YY_UNEXPECTED_ERROR("Ordinal binding and Named binding cannot be combined\n"); } - copy_and_replace_questionmark(p, yylloc->first_column, yylloc->last_column, yylval->node->value_); if (IS_FAST_PARAMETERIZE) { yylval->node->raw_text_ = parse_strdup(yytext, p->malloc_pool_, &(yylval->node->text_len_)); check_malloc(yylval->node->raw_text_);