[FEAT MERGE]implement user-defined rewrite rules

This commit is contained in:
obdev
2022-12-30 08:10:42 +00:00
committed by ob-robot
parent 21c0bac716
commit 9dcc0a529e
100 changed files with 5540 additions and 315 deletions

View File

@ -73,6 +73,9 @@
#include "observer/omt/ob_tenant_config_mgr.h"
#include "sql/executor/ob_executor_rpc_impl.h"
#include "sql/executor/ob_remote_executor_processor.h"
#include "sql/udr/ob_udr_utils.h"
#include "sql/udr/ob_udr_mgr.h"
#include "sql/udr/ob_udr_analyzer.h"
#include "common/ob_smart_call.h"
namespace oceanbase
@ -819,17 +822,9 @@ int ObSql::fill_result_set(const ObPsStmtId stmt_id, const ObPsStmtInfo &stmt_in
return ret;
}
int ObSql::do_add_ps_cache(const ObString &sql,
const ObString &no_param_sql,
const ObIArray<ObPCParam*> &raw_params,
const ObIArray<int64_t> &fixed_param_idx,
int64_t param_cnt,
int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
ObSchemaGetterGuard &schema_guard,
stmt::StmtType stmt_type,
ObResultSet &result,
bool is_inner_sql,
bool is_sensitive_sql,
int32_t returning_into_parm_num)
ObResultSet &result)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo &session = result.get_session();
@ -845,27 +840,21 @@ int ObSql::do_add_ps_cache(const ObString &sql,
bool duplicate_prepare = false;
// add stmt item
if (OB_FAIL(ps_cache->get_or_add_stmt_item(db_id,
sql,
info_ctx.normalized_sql_,
ps_stmt_item))) {
LOG_WARN("get or create stmt item faield", K(ret), K(db_id), K(sql));
} else if (OB_FAIL(ps_cache->get_or_add_stmt_info(result,
sql,
no_param_sql,
raw_params,
fixed_param_idx,
param_cnt,
LOG_WARN("get or create stmt item faield", K(ret), K(db_id), K(info_ctx.normalized_sql_));
} else if (OB_FAIL(ps_cache->get_or_add_stmt_info(info_ctx,
result,
schema_guard,
stmt_type,
ps_stmt_item,
ref_stmt_info,
returning_into_parm_num))) {
LOG_WARN("get or create stmt info failed", K(ret), K(ps_stmt_item), K(db_id), K(sql));
ref_stmt_info))) {
LOG_WARN("get or create stmt info failed", K(ret), K(ps_stmt_item), K(db_id), K(info_ctx));
} else if (OB_ISNULL(ps_stmt_item) || OB_ISNULL(ref_stmt_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("stmt_item or stmt_info is NULL", K(ret), KP(ps_stmt_item), KP(ref_stmt_info));
}
if (NULL != ref_stmt_info) {
ref_stmt_info->set_is_sensitive_sql(is_sensitive_sql);
ref_stmt_info->set_is_sensitive_sql(info_ctx.is_sensitive_sql_);
}
//add session info
if (OB_SUCC(ret)) {
@ -875,11 +864,11 @@ int ObSql::do_add_ps_cache(const ObString &sql,
ref_stmt_info,
client_stmt_id,
duplicate_prepare,
is_inner_sql))) {
info_ctx.is_inner_sql_))) {
LOG_WARN("prepare_ps_stmt failed", K(ret), K(inner_stmt_id), K(client_stmt_id));
} else {
result.set_statement_id(client_stmt_id);
result.set_stmt_type(stmt_type);
result.set_stmt_type(info_ctx.stmt_type_);
LOG_TRACE("add ps session info", K(ret), K(*ref_stmt_info), K(client_stmt_id),
K(*ps_stmt_item), K(session.get_sessid()));
}
@ -907,9 +896,8 @@ int ObSql::do_real_prepare(const ObString &sql,
ObStmt *basic_stmt = NULL;
stmt::StmtType stmt_type = stmt::T_NONE;
int64_t param_cnt = 0;
ObString normalized_sql;
int32_t returning_into_parm_num = OB_INVALID_COUNT;
ObString no_param_sql;
PsCacheInfoCtx info_ctx;
ObUDRItemMgr::UDRItemRefGuard item_guard;
ObIAllocator &allocator = result.get_mem_pool();
ObSQLSessionInfo &session = result.get_session();
ObExecContext &ectx = result.get_exec_context();
@ -926,11 +914,15 @@ int ObSql::do_real_prepare(const ObString &sql,
ObPlanCacheCtx pc_ctx(sql, true, /*is_ps_mode*/
allocator, context, ectx, session.get_effective_tenant_id());
ParamStore param_store( (ObWrapperAllocator(&allocator)) );
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(session.get_effective_tenant_id()));
pc_ctx.set_is_inner_sql(is_inner_sql);
CHECK_COMPATIBILITY_MODE(context.session_info_);
if (OB_ISNULL(context.session_info_) || OB_ISNULL(context.schema_guard_)) {
if (!tenant_config.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant config is invalid", K(ret));
} else if (OB_ISNULL(context.session_info_) || OB_ISNULL(context.schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is NULL", K(ret));
} else if (OB_FAIL(parser.parse(sql,
@ -954,7 +946,7 @@ int ObSql::do_real_prepare(const ObString &sql,
&& context.is_prepare_stage_
&& context.is_pre_execute_)) {
param_cnt = parse_result.question_mark_ctx_.count_;
normalized_sql = sql;
info_ctx.normalized_sql_ = sql;
if (stmt::T_ANONYMOUS_BLOCK == stmt_type
&& context.is_prepare_protocol_
&& context.is_prepare_stage_
@ -974,7 +966,13 @@ int ObSql::do_real_prepare(const ObString &sql,
}
if (OB_FAIL(generate_stmt(parse_result, NULL, context, allocator, result, basic_stmt))) {
LOG_WARN("generate stmt failed", K(ret));
} else if (!is_from_pl
&& !is_inner_sql
&& tenant_config->enable_user_defined_rewrite_rules
&& OB_FAIL(ObUDRUtils::match_udr_item(sql, session, allocator, item_guard))) {
LOG_WARN("failed to match rewrite rule", K(ret));
} else if (ObStmt::is_dml_stmt(stmt_type)
&& NULL == item_guard.get_ref_obj()
&& !ObStmt::is_show_stmt(stmt_type)
&& !is_inner_sql
&& !is_from_pl
@ -992,7 +990,7 @@ int ObSql::do_real_prepare(const ObString &sql,
pc_ctx.fixed_param_idx_.reset();
pc_ctx.fp_result_.raw_params_.reset();
} else {
no_param_sql = pc_ctx.sql_ctx_.spm_ctx_.bl_key_.constructed_sql_;
info_ctx.no_param_sql_ = pc_ctx.sql_ctx_.spm_ctx_.bl_key_.constructed_sql_;
}
}
@ -1019,40 +1017,37 @@ int ObSql::do_real_prepare(const ObString &sql,
//需要确保prepare的文本与客户端发过来的一致
if (is_inner_sql) {
// pl
normalized_sql = basic_stmt->get_query_ctx()->get_sql_stmt();
info_ctx.normalized_sql_ = basic_stmt->get_query_ctx()->get_sql_stmt();
} else if (result.is_returning() && ObStmt::is_dml_write_stmt(stmt_type)) {
normalized_sql = sql;
no_param_sql = basic_stmt->get_query_ctx()->get_sql_stmt();
info_ctx.normalized_sql_ = sql;
info_ctx.no_param_sql_ = basic_stmt->get_query_ctx()->get_sql_stmt();
} else {
normalized_sql = sql;
info_ctx.normalized_sql_ = sql;
}
}
if (OB_SUCC(ret)) {
if (basic_stmt->is_insert_stmt() || basic_stmt->is_update_stmt() || basic_stmt->is_delete_stmt()) {
ObDelUpdStmt *dml_stmt = static_cast<ObDelUpdStmt*>(basic_stmt);
if (dml_stmt->get_returning_into_exprs().count() != 0 && dml_stmt->is_returning()) {
returning_into_parm_num = dml_stmt->get_returning_into_exprs().count();
info_ctx.num_of_returning_into_ = dml_stmt->get_returning_into_exprs().count();
}
}
}
LOG_INFO("generate new stmt", K(ret), K(param_cnt), K(stmt_type), K(no_param_sql),
K(normalized_sql), K(sql), K(returning_into_parm_num));
LOG_INFO("generate new stmt", K(ret), K(param_cnt), K(stmt_type), K(info_ctx.no_param_sql_),
K(info_ctx.normalized_sql_), K(sql), K(info_ctx.num_of_returning_into_));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(do_add_ps_cache(normalized_sql,
no_param_sql,
pc_ctx.fp_result_.raw_params_,
pc_ctx.fixed_param_idx_,
param_cnt,
*context.schema_guard_,
stmt_type,
result,
is_inner_sql,
context.is_sensitive_,
returning_into_parm_num))) {
info_ctx.param_cnt_ = param_cnt;
info_ctx.stmt_type_ = stmt_type;
info_ctx.is_inner_sql_ = is_inner_sql;
info_ctx.is_sensitive_sql_ = context.is_sensitive_;
info_ctx.raw_params_ = &pc_ctx.fp_result_.raw_params_;
info_ctx.fixed_param_idx_ = &pc_ctx.fixed_param_idx_;
info_ctx.raw_sql_.assign_ptr(sql.ptr(), sql.length());
if (OB_FAIL(do_add_ps_cache(info_ctx, *context.schema_guard_, result))) {
LOG_WARN("add to ps plan cache failed",
K(ret), K(normalized_sql), K(param_cnt));
K(ret), K(info_ctx.normalized_sql_), K(param_cnt));
}
}
//if the error code is ob_timeout, we add more error info msg for dml query.
@ -1065,7 +1060,7 @@ int ObSql::do_real_prepare(const ObString &sql,
IS_SHOW_STMT(parse_result.result_tree_->children_[0]->type_))) {
LOG_USER_ERROR(OB_TIMEOUT, THIS_WORKER.get_timeout_ts() - session.get_query_start_time());
}
LOG_INFO("add ps cache", K(normalized_sql), K(param_cnt), K(ret));
LOG_INFO("add ps cache", K(info_ctx.normalized_sql_), K(param_cnt), K(ret));
return ret;
}
@ -1335,6 +1330,7 @@ int ObSql::construct_ps_param(const ParamStore &params,
ObPlanCacheCtx &phy_ctx)
{
int ret = OB_SUCCESS;
phy_ctx.fp_result_.ps_params_.reset();
phy_ctx.fp_result_.ps_params_.set_allocator(&phy_ctx.allocator_);
phy_ctx.fp_result_.ps_params_.set_capacity(params.count());
for (int i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
@ -1533,8 +1529,7 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
} else if (OB_FAIL(construct_param_store(ps_params, pctx->get_param_store_for_update()))) {
LOG_WARN("construct param store failed", K(ret));
} else {
const ObString &sql =
!ps_info->get_no_param_sql().empty() ? ps_info->get_no_param_sql() : ps_info->get_ps_sql();
const ObString &sql = !ps_info->get_no_param_sql().empty() ? ps_info->get_no_param_sql() : ps_info->get_ps_sql();
context.cur_sql_ = sql;
#ifndef NDEBUG
LOG_INFO("Begin to handle execute stmtement", K(session.get_sessid()), K(sql));
@ -1670,6 +1665,7 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
const uint64_t tenant_id = session->get_effective_tenant_id();
bool use_plan_cache = session->get_local_ob_enable_plan_cache();
context.self_add_plan_ = false;
context.cur_sql_ = trimed_stmt;
pc_ctx = new (pc_ctx) ObPlanCacheCtx(trimed_stmt,
remote_sql_info.use_ps_, /*is_ps_mode*/
allocator,
@ -1698,7 +1694,8 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
//切割出来的query最后走batched multi stmt的逻辑去查询plan cache和生成计划
ObParser parser(allocator,
session->get_sql_mode(),
session->get_local_collation_connection());
session->get_local_collation_connection(),
pc_ctx->def_name_ctx_);
ObMPParseStat parse_stat;
if (OB_FAIL(parser.split_multiple_stmt(remote_sql_info.remote_sql_, queries, parse_stat))) {
LOG_WARN("split multiple stmt failed", K(ret), K(remote_sql_info));
@ -3084,7 +3081,7 @@ int ObSql::get_outline_data(ObSqlCtx &context,
}
if (OB_SUCC(ret) && !outline_content.empty()) {
ObParser parser(pc_ctx.allocator_, session->get_sql_mode(), session->get_local_collation_connection());
ObParser parser(pc_ctx.allocator_, session->get_sql_mode(), session->get_local_collation_connection(), pc_ctx.def_name_ctx_);
ObSqlString sql_helper;
ObString temp_outline_sql;
if (OB_FAIL(sql_helper.assign_fmt("select %.*s 1 from dual", outline_content.length(),
@ -3209,9 +3206,9 @@ int ObSql::parser_and_check(const ObString &outlined_stmt,
} else {
pctx->reset_datum_param_store();
pctx->get_param_store_for_update().reuse();
ObParser parser(allocator, session->get_sql_mode(), session->get_local_collation_connection());
ObParser parser(allocator, session->get_sql_mode(), session->get_local_collation_connection(), pc_ctx.def_name_ctx_);
if (OB_FAIL(parser.parse(outlined_stmt, parse_result,
STD_MODE,
pc_ctx.is_rewrite_sql_ ? UDR_SQL_MODE : STD_MODE,
pc_ctx.sql_ctx_.handle_batched_multi_stmt()))) {
LOG_WARN("Generate syntax tree failed", K(outlined_stmt), K(ret));
} else if (pc_ctx.is_ps_mode_
@ -3436,9 +3433,13 @@ int ObSql::pc_add_plan(ObPlanCacheCtx &pc_ctx,
pc_ctx.fp_result_.pc_key_.namespace_ = ObLibCacheNameSpace::NS_CRSR;
plan_added = false;
bool is_batch_exec = pc_ctx.sql_ctx_.multi_stmt_item_.is_batched_multi_stmt();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (OB_ISNULL(phy_plan) || OB_ISNULL(plan_cache)) {
ret = OB_NOT_INIT;
LOG_WARN("Fail to generate plan", K(phy_plan), K(plan_cache));
} else if (!tenant_config.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant config is invalid", K(ret));
} else if (OB_USE_PLAN_CACHE_NONE == phy_plan->get_phy_plan_hint().plan_cache_policy_) {
LOG_DEBUG("Hint not use plan cache");
} else if (OB_FAIL(result.to_plan(pc_ctx.is_ps_mode_, phy_plan))) {
@ -3452,9 +3453,15 @@ int ObSql::pc_add_plan(ObPlanCacheCtx &pc_ctx,
pc_ctx.sql_ctx_.spm_ctx_.bl_key_.sql_id_,
phy_plan->stat_.sql_id_))) {
LOG_WARN("failed to ob write string", K(ret));
} else if (pc_ctx.is_rewrite_sql_ && OB_FAIL(phy_plan->set_rule_name(pc_ctx.rule_name_))) {
LOG_WARN("failed to ob write string", K(ret));
} else {
sql::ObUDRMgr *rule_mgr = MTL(sql::ObUDRMgr*);
phy_plan->set_outline_state(outline_state);
phy_plan->stat_.db_id_ = pc_ctx.sql_ctx_.spm_ctx_.bl_key_.db_id_;
phy_plan->stat_.is_rewrite_sql_ = pc_ctx.is_rewrite_sql_;
phy_plan->stat_.rule_version_ = rule_mgr->get_rule_version();
phy_plan->stat_.enable_udr_ = tenant_config->enable_user_defined_rewrite_rules;
if (pc_ctx.is_ps_mode_) {
//远程SQL第二次进入plan,将raw_sql作为pc_key存入plan cache中,
@ -3527,7 +3534,8 @@ void ObSql::check_template_sql_can_be_prepare(ObPlanCacheCtx &pc_ctx, ObPhysical
ParseResult parse_result;
ObParser parser(pc_ctx.allocator_,
session->get_sql_mode(),
session->get_local_collation_connection());
session->get_local_collation_connection(),
pc_ctx.def_name_ctx_);
if (OB_FAIL(parser.parse(temp_sql, parse_result))) {
LOG_DEBUG("generate syntax tree failed", K(temp_sql), K(ret));
} else {
@ -3725,6 +3733,58 @@ int ObSql::need_add_plan(const ObPlanCacheCtx &pc_ctx,
return ret;
}
int ObSql::pc_add_udr_plan(const ObUDRItemMgr::UDRItemRefGuard &item_guard,
ObPlanCacheCtx &pc_ctx,
ObResultSet &result,
ObOutlineState &outline_state,
bool& plan_added)
{
int ret = OB_SUCCESS;
int get_plan_err = OB_SUCCESS;
bool add_plan_to_pc = false;
ParseResult parse_result;
ObIAllocator &allocator = result.get_mem_pool();
ObSQLSessionInfo &session = result.get_session();
ObPlanCache *plan_cache = session.get_plan_cache();
bool is_enable_transform_tree = !session.get_enable_exact_mode();
ObExecContext &ectx = result.get_exec_context();
ObPhysicalPlanCtx *pctx = ectx.get_physical_plan_ctx();
ParamStore param_store( (ObWrapperAllocator(&allocator)) );
const ObString &raw_sql = pc_ctx.raw_sql_;
ObPlanCacheCtx tmp_pc_ctx(raw_sql, pc_ctx.is_ps_mode_,
allocator, pc_ctx.sql_ctx_, ectx, session.get_effective_tenant_id());
tmp_pc_ctx.fp_result_ = pc_ctx.fp_result_;
tmp_pc_ctx.normal_parse_const_cnt_ = pc_ctx.normal_parse_const_cnt_;
tmp_pc_ctx.set_is_rewrite_sql(true);
tmp_pc_ctx.rule_name_ = pc_ctx.rule_name_;
const ObUDRItem *rule_item = item_guard.get_ref_obj();
ObParser parser(allocator, session.get_sql_mode(),
session.get_local_collation_connection(),
pc_ctx.def_name_ctx_);
if (OB_ISNULL(rule_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rule item is null", K(ret));
} else if (OB_FAIL(tmp_pc_ctx.fixed_param_info_list_.assign(rule_item->get_fixed_param_value_array()))) {
LOG_WARN("failed to assign fixed param info list", K(ret));
} else if (OB_FAIL(tmp_pc_ctx.dynamic_param_info_list_.assign(rule_item->get_dynamic_param_info_array()))) {
LOG_WARN("failed to assign dynamic param info list", K(ret));
} else if (OB_FAIL(tmp_pc_ctx.tpl_sql_const_cons_.assign(pc_ctx.tpl_sql_const_cons_))) {
LOG_WARN("failed to assign tpl sql const cons", K(ret));
} else if (OB_FAIL(parser.parse(raw_sql, parse_result))) {
LOG_WARN("failed to parse sql", K(ret), K(raw_sql));
} else if (OB_FAIL(ObSqlParameterization::parameterize_syntax_tree(allocator,
false/*is_transform_outline*/,
tmp_pc_ctx,
parse_result.result_tree_,
param_store,
session.get_local_collation_connection()))) {
LOG_WARN("parameterize syntax tree failed", K(ret));
} else if (OB_FAIL(pc_add_plan(tmp_pc_ctx, result, outline_state, plan_cache, plan_added))) {
LOG_WARN("failed to add plan", K(ret));
}
return ret;
}
OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
ObSqlCtx &context,
ObResultSet &result,
@ -3741,6 +3801,9 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
ParseResult parse_result;
ParseResult outline_parse_result;
bool add_plan_to_pc = false;
bool is_match_udr = false;
ObUDRItemMgr::UDRItemRefGuard item_guard;
UDRBackupRecoveryGuard backup_recovery_guard(context, pc_ctx);
ObSQLSessionInfo &session = result.get_session();
ObPlanCache *plan_cache = session.get_plan_cache();
ObSpmCacheCtx &spm_ctx = context.spm_ctx_;
@ -3760,6 +3823,15 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
context.multi_stmt_item_.is_batched_multi_stmt() &&
OB_FAIL(get_first_batched_multi_stmt(context.multi_stmt_item_, outlined_stmt))) {
LOG_WARN("failed to get first batched stmt item", K(ret));
} else if (OB_FAIL(ObUDRUtils::match_udr_and_refill_ctx(outlined_stmt,
context,
result,
pc_ctx,
is_match_udr,
item_guard))) {
LOG_WARN("failed to match udr and refill ctx", K(ret));
} else if (is_match_udr
&& FALSE_IT(outlined_stmt = item_guard.get_ref_obj()->get_replacement())) {
} else if (OB_FAIL(handle_parser(outlined_stmt,
result.get_exec_context(),
pc_ctx,
@ -3797,6 +3869,7 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
} else {
LOG_WARN("Failed to generate plan", K(ret), K(result.get_exec_context().need_disconnect()));
}
} else if (OB_FALSE_IT(backup_recovery_guard.recovery())) {
} else if (OB_FAIL(need_add_plan(pc_ctx,
result,
use_plan_cache,
@ -3804,7 +3877,13 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
LOG_WARN("get need_add_plan failed", K(ret));
} else if (!add_plan_to_pc) {
// do nothing
} else if (OB_FAIL(pc_add_plan(pc_ctx, result, outline_state, plan_cache, plan_added))) {
} else if (is_match_udr && OB_FAIL(pc_add_udr_plan(item_guard,
pc_ctx,
result,
outline_state,
plan_added))) {
LOG_WARN("fail to add plan to plan cache", K(ret));
} else if (!is_match_udr && OB_FAIL(pc_add_plan(pc_ctx, result, outline_state, plan_cache, plan_added))) {
LOG_WARN("fail to add plan to plan cache", K(ret));
}
//if the error code is ob_timeout, we add more error info msg for dml query.