[FEAT MERGE] 4.1 PL/SQL enhence & development

This commit is contained in:
obdev
2023-01-09 08:47:31 +00:00
committed by ob-robot
parent 322032b9ca
commit 08802d94f6
98 changed files with 5260 additions and 2676 deletions

View File

@ -237,7 +237,10 @@ int ObSql::stmt_list_field(const common::ObString &table_name,
return OB_NOT_IMPLEMENT;
}
int ObSql::fill_result_set(ObResultSet &result_set, ObSqlCtx *context, const bool is_ps_mode, ObStmt &basic_stmt)
int ObSql::fill_result_set(ObResultSet &result_set,
ObSqlCtx *context,
const PlanCacheMode mode,
ObStmt &basic_stmt)
{
int ret = OB_SUCCESS;
@ -265,7 +268,7 @@ int ObSql::fill_result_set(ObResultSet &result_set, ObSqlCtx *context, const boo
ObCharset::get_default_collation_by_mode(result_charset, lib::is_oracle_mode());
switch (stmt->get_stmt_type()) {
case stmt::T_SELECT: {
if (OB_FAIL(fill_select_result_set(result_set, context, is_ps_mode, collation_type, type_name,
if (OB_FAIL(fill_select_result_set(result_set, context, mode, collation_type, type_name,
basic_stmt, field))) {
LOG_WARN("fill select result set failed", K(ret));
}
@ -517,7 +520,9 @@ int ObSql::fill_result_set(ObResultSet &result_set, ObSqlCtx *context, const boo
const int64_t question_marks_count = pctx->get_is_ps_rewrite_sql() ?
pctx->get_orig_question_mark_cnt() : stmt->get_query_ctx()->get_prepare_param_count();
if (OB_SUCC(ret) && is_ps_mode && question_marks_count > 0) { // param column is only needed in ps mode
// param column is only needed in ps mode
if (OB_SUCC(ret) && question_marks_count > 0
&& (PC_PS_MODE == mode || PC_PL_MODE == mode)) {
if (OB_FAIL(result_set.reserve_param_columns(question_marks_count))) {
LOG_WARN("reserve param columns failed", K(ret), K(question_marks_count));
}
@ -548,7 +553,8 @@ int ObSql::fill_result_set(ObResultSet &result_set, ObSqlCtx *context, const boo
}
// for resolve returning_params and only work for ps_mode
// SQL: insert/update/delete ...returning expr1 ... into ?...
if (OB_SUCC(ret) && ObStmt::is_dml_write_stmt(stmt->get_stmt_type()) && is_ps_mode) {
if (OB_SUCC(ret) && ObStmt::is_dml_write_stmt(stmt->get_stmt_type())
&& (PC_PS_MODE == mode || PC_PL_MODE == mode)) {
del_upd_stmt = static_cast<ObDelUpdStmt *>(stmt);
if (del_upd_stmt->is_returning()) {
int64_t returning_param_num = del_upd_stmt->get_returning_exprs().count();
@ -569,7 +575,7 @@ int ObSql::fill_result_set(ObResultSet &result_set, ObSqlCtx *context, const boo
return ret;
}
int ObSql::fill_select_result_set(ObResultSet &result_set, ObSqlCtx *context, const bool is_ps_mode,
int ObSql::fill_select_result_set(ObResultSet &result_set, ObSqlCtx *context, const PlanCacheMode mode,
ObCollationType collation_type, const ObString &type_name,
ObStmt &basic_stmt, ObField &field)
{
@ -748,7 +754,7 @@ int ObSql::fill_select_result_set(ObResultSet &result_set, ObSqlCtx *context, co
}
}
}
if (OB_SUCC(ret) && !is_ps_mode) {
if (OB_SUCC(ret) && !(PC_PS_MODE == mode || PC_PL_MODE == mode)) {
void *buf = NULL;
if (OB_ISNULL(buf = alloc.alloc(sizeof(ObParamedSelectItemCtx)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -913,8 +919,8 @@ int ObSql::do_real_prepare(const ObString &sql,
bool is_from_pl = (NULL != context.secondary_namespace_ || result.is_simple_ps_protocol());
ObPsPrepareStatusGuard ps_status_guard(session, is_from_pl);
ObPlanCacheCtx pc_ctx(sql, true, /*is_ps_mode*/
allocator, context, ectx, session.get_effective_tenant_id());
ObPlanCacheCtx pc_ctx(sql, PC_PS_MODE, allocator, context, ectx,
session.get_effective_tenant_id());
ParamStore param_store( (ObWrapperAllocator(&allocator)) );
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(session.get_effective_tenant_id()));
pc_ctx.set_is_inner_sql(is_inner_sql);
@ -1005,7 +1011,7 @@ int ObSql::do_real_prepare(const ObString &sql,
LOG_WARN("query ctx is null", K(ret));
} else if (stmt::T_CALL_PROCEDURE == basic_stmt->get_stmt_type()
&& FALSE_IT(result.set_cmd(dynamic_cast<ObICmd*>(basic_stmt)))) {
} else if (OB_FAIL(fill_result_set(result, &context, true, *basic_stmt))) {
} else if (OB_FAIL(fill_result_set(result, &context, PC_PS_MODE, *basic_stmt))) {
LOG_WARN("Failed to fill result set", K(ret));
} else if (OB_ISNULL(result.get_param_fields())) {
ret = OB_INVALID_ARGUMENT;
@ -1066,6 +1072,376 @@ int ObSql::do_real_prepare(const ObString &sql,
return ret;
}
ObSql::TimeoutGuard::TimeoutGuard(ObSQLSessionInfo &session)
: session_(session)
{
int ret = OB_SUCCESS;
worker_timeout_ = THIS_WORKER.get_timeout_ts();
if (OB_FAIL(session_.get_query_timeout(query_timeout_))
|| OB_FAIL(session_.get_tx_timeout(trx_timeout_))) {
LOG_ERROR("get timeout failed", KR(ret), K(query_timeout_), K(trx_timeout_));
}
}
ObSql::TimeoutGuard::~TimeoutGuard()
{
int ret = OB_SUCCESS;
if (THIS_WORKER.get_timeout_ts() != worker_timeout_) {
THIS_WORKER.set_timeout_ts(worker_timeout_);
}
int64_t query_timeout = 0;
int64_t trx_timeout = 0;
if (OB_FAIL(session_.get_query_timeout(query_timeout))
|| OB_FAIL(session_.get_tx_timeout(trx_timeout))) {
LOG_ERROR("get timeout failed", KR(ret), K(query_timeout), K(trx_timeout));
} else {
if (query_timeout != query_timeout_ || trx_timeout != trx_timeout_) {
ObObj query_val, trx_val;
query_val.set_int(query_timeout_);
trx_val.set_int(trx_timeout_);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(session_.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_val))) {
LOG_WARN("set sys variable failed", K(ret), K(OB_SV_QUERY_TIMEOUT), K(query_val));
} else if (OB_FAIL(session_.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_val))) {
LOG_WARN("set sys variable failed", K(ret), K(OB_SV_TRX_TIMEOUT), K(trx_val));
}
}
}
}
int ObSql::set_timeout_for_pl(ObSQLSessionInfo &session_info, int64_t &abs_timeout_us)
{
int ret = OB_SUCCESS;
int64_t query_timeout;
if (OB_FAIL(session_info.get_query_timeout(query_timeout))) {
// do nothing
} else {
OX (abs_timeout_us = session_info.get_query_start_time() > 0
? session_info.get_query_start_time() + query_timeout
: ObTimeUtility::current_time() + query_timeout);
if (THIS_WORKER.get_timeout_ts() > abs_timeout_us) {
OX (THIS_WORKER.set_timeout_ts(abs_timeout_us));
}
}
return ret;
}
/*
* sql: pl 中的 sql 语句
* PLPrepareCtx: prepare 用到的相关信息
* PLPrepareResult: prepare 后的输出结果
*/
int ObSql::handle_pl_prepare(const ObString &sql,
ObSPIService::PLPrepareCtx &pl_prepare_ctx,
ObSPIService::PLPrepareResult &pl_prepare_result)
{
int ret = OB_SUCCESS;
ObString cur_query;
ObString trimed_stmt = const_cast<ObString &>(sql).trim();
ObSqlCtx &context = pl_prepare_result.sql_ctx_;
int64_t param_cnt = 0;
ObString normalized_sql;
ParseResult parse_result;
ObStmt *basic_stmt = NULL;
int64_t cur_timeout_us = 0;
stmt::StmtType stmt_type = stmt::T_NONE;
ObSchemaGetterGuard &schema_guard = pl_prepare_result.schema_guard_;
ObSQLSessionInfo &sess = pl_prepare_ctx.sess_info_;
TimeoutGuard timeout_guard(sess);
int64_t old_query_start_time = sess.get_query_start_time();
sess.set_query_start_time(ObTimeUtility::current_time());
CK (OB_NOT_NULL(pl_prepare_result.get_allocator()));
CK (OB_NOT_NULL(pl_prepare_result.result_set_));
if (OB_SUCC(ret)) {
ObIAllocator &allocator = *pl_prepare_result.get_allocator();
ObParser parser(allocator, sess.get_sql_mode(), sess.get_local_collation_connection());
ParseMode parse_mode = pl_prepare_ctx.is_dbms_sql_ ? DBMS_SQL_MODE :
pl_prepare_ctx.is_dynamic_sql_ ? DYNAMIC_SQL_MODE :
sess.is_for_trigger_package() ? TRIGGER_MODE : STD_MODE;
if (OB_SUCC(ret)) {
context.is_dynamic_sql_ = pl_prepare_ctx.is_dynamic_sql_;
context.is_dbms_sql_ = pl_prepare_ctx.is_dbms_sql_;
context.is_cursor_ = pl_prepare_ctx.is_cursor_;
context.secondary_namespace_ = pl_prepare_ctx.secondary_ns_;
context.session_info_ = &sess;
context.disable_privilege_check_ = OB_SYS_TENANT_ID == sess.get_priv_tenant_id()
? PRIV_CHECK_FLAG_DISABLE
: PRIV_CHECK_FLAG_IN_PL;
context.exec_type_ = PLSql;
context.is_prepare_protocol_ = true;
context.is_prepare_stage_ = true;
}
if (OB_SUCC(ret)) {
WITH_CONTEXT(pl_prepare_result.mem_context_) {
ObResultSet &result = *pl_prepare_result.result_set_;
LinkExecCtxGuard link_guard(result.get_session(), result.get_exec_context());
if (trimed_stmt.empty()) {
ret = OB_ERR_EMPTY_QUERY;
LOG_WARN("query is empty", K(ret));
} else if (OB_FAIL(set_timeout_for_pl(sess, cur_timeout_us))) {
LOG_WARN("failed to set timeout for pl", K(ret));
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
sess.get_effective_tenant_id(), schema_guard))) {
LOG_WARN("failed to get tenant schema guard", K(ret));
} else if (FALSE_IT(context.schema_guard_ = &schema_guard)) {
} else if (OB_FAIL(init_result_set(context, result))) {
LOG_WARN("failed to init result set", K(ret));
} else if (OB_FAIL(ob_write_string(allocator, sess.get_current_query_string(), cur_query))) {
LOG_WARN("failed to write string", K(ret));
} else if (OB_FAIL(sess.store_query_string(trimed_stmt))) {
LOG_WARN("store query string fail", K(ret));
} else if (OB_FAIL(parser.parse(sql, parse_result, parse_mode, false, false))) {
LOG_WARN("generate syntax tree failed", K(sql), K(ret));
} else if (is_mysql_mode() && ObSQLUtils::is_mysql_ps_not_support_stmt(parse_result)) {
ret = OB_ER_UNSUPPORTED_PS;
LOG_WARN("This command is not supported in the prepared statement protocol yet", K(ret));
} else if (NULL == pl_prepare_ctx.secondary_ns_ && !pl_prepare_ctx.is_dynamic_sql_) {
result.set_simple_ps_protocol();
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ObResolverUtils::resolve_stmt_type(parse_result, stmt_type))) {
LOG_WARN("failed to resolve stmt type", K(ret));
} else if (FALSE_IT(result.set_stmt_type(stmt_type))) {
} else if (result.is_simple_ps_protocol()
|| (stmt::T_ANONYMOUS_BLOCK == stmt_type && context.is_prepare_protocol_
&& context.is_prepare_stage_ && context.is_pre_execute_)) {
if (parse_result.is_dynamic_sql_) {
context.is_dynamic_sql_ = true;
}
param_cnt = parse_result.question_mark_ctx_.count_;
normalized_sql = context.is_dynamic_sql_ && parse_result.no_param_sql_len_ > 0
? ObString(parse_result.no_param_sql_len_, parse_result.no_param_sql_) : sql;
if (stmt::T_ANONYMOUS_BLOCK == stmt_type && context.is_prepare_protocol_
&& context.is_prepare_stage_ && context.is_pre_execute_) {
OZ (result.reserve_param_columns(param_cnt));
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
ObField param_field;
param_field.type_.set_type(ObIntType);
param_field.cname_ = ObString::make_string("?");
OZ (result.add_param_column(param_field), K(param_field), K(i), K(param_cnt));
}
}
} else {
if (parse_result.is_dynamic_sql_) {
context.is_dynamic_sql_ = true;
}
if (context.is_dynamic_sql_ && !context.is_dbms_sql_) {
parse_result.input_sql_ = parse_result.no_param_sql_;
parse_result.input_sql_len_ = parse_result.no_param_sql_len_;
}
if (OB_FAIL(generate_stmt(parse_result, NULL, context, allocator, result, basic_stmt))) {
LOG_WARN("generate stmt failed", K(ret));
} else if (OB_ISNULL(basic_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("generate stmt success, but stmt is NULL", K(ret));
} else if (OB_ISNULL(basic_stmt->get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query ctx is null", K(ret));
} else if (stmt::T_CALL_PROCEDURE == basic_stmt->get_stmt_type()
&& FALSE_IT(result.set_cmd(dynamic_cast<ObICmd*>(basic_stmt)))) {
} else if (OB_FAIL(fill_result_set(result, &context, PC_PL_MODE, *basic_stmt))) {
LOG_WARN("Failed to fill result set", K(ret));
} else if (OB_ISNULL(result.get_param_fields())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(result.get_param_fields()), K(ret));
} else {
normalized_sql = basic_stmt->get_query_ctx()->get_sql_stmt();
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ob_write_string(allocator, normalized_sql, result.get_stmt_ps_sql(), true))) {
LOG_WARN("failed to write string", K(trimed_stmt), K(ret));
} else if (OB_FAIL(sess.store_query_string(cur_query))) {
LOG_WARN("failed to store query string", K(ret));
} else {
// do nothing
}
}
}
}
sess.set_query_start_time(old_query_start_time);
return ret;
}
int ObSql::handle_sql_execute(const ObString &sql,
ObSqlCtx &context,
ObResultSet &result,
ParamStore &org_params,
PlanCacheMode mode)
{
int ret = OB_SUCCESS;
int get_plan_err = OB_SUCCESS;
bool use_plan_cache = false;
ObIAllocator &allocator = result.get_mem_pool();
ObSQLSessionInfo *session = context.session_info_;
ObExecContext &ectx = result.get_exec_context();
ObPhysicalPlanCtx *pctx = ectx.get_physical_plan_ctx();
ParamStore params( (ObWrapperAllocator(allocator)) );
ParamStore *ab_params = NULL;
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(session) || OB_ISNULL(session->get_plan_cache())) {
ret = OB_INVALID_ARGUMENT;
} else if ((mode == PC_PS_MODE || mode == PC_PL_MODE) && OB_ISNULL(pctx)) {
ret = OB_INVALID_ARGUMENT;
} else {
use_plan_cache = session->get_local_ob_enable_plan_cache();
}
ObPlanCacheCtx pc_ctx(sql, mode, allocator, context, ectx, session->get_effective_tenant_id());
if (OB_FAIL(ret)) {
// do nothing
} else if (mode == PC_PL_MODE) {
if (OB_FAIL(reconstruct_pl_params_store(allocator, context, org_params, params, ab_params))) {
LOG_WARN("failed to reconstruct pl params", K(ret));
} else if (context.multi_stmt_item_.is_batched_multi_stmt() && OB_ISNULL(ab_params)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pl ab params is null", K(ret));
} else if (OB_FAIL(construct_param_store(params, pctx->get_param_store_for_update()))) {
LOG_WARN("construct param store failed", K(ret));
} else if (OB_FAIL(construct_parameterized_params(params, pc_ctx))) {
LOG_WARN("construct parameterized params failed", K(ret));
} else {
pc_ctx.normal_parse_const_cnt_ = params.count();
pc_ctx.set_is_parameterized_execute();
pc_ctx.ab_params_ = ab_params;
}
} else if (mode == PC_PS_MODE) {
// TODO, not support ps now.
ret = OB_ERR_UNEXPECTED;
}
if (OB_SUCC(ret)) {
if (!use_plan_cache) {
// do nothing
} else if (OB_FAIL(pc_get_plan_and_fill_result(pc_ctx, result,
get_plan_err, ectx.get_need_disconnect_for_update()))) {
LOG_WARN("failed to get plan", K(ret));
}
}
if (OB_SUCC(ret)) {
if (!result.get_is_from_plan_cache()) {
if (mode == PC_PS_MODE || mode == PC_PL_MODE) {
pctx->get_param_store_for_update().reset();
}
if (OB_FAIL(handle_physical_plan(sql, context, result, pc_ctx, get_plan_err, mode))) {
if (OB_ERR_PROXY_REROUTE == ret) {
LOG_DEBUG("fail to handle physical plan", K(ret));
} else {
LOG_WARN("fail to handle physical plan", K(ret));
}
}
}
}
if (OB_SUCC(ret) && !(org_params.count() == 0 && context.is_prepare_protocol_)) {
if (OB_FAIL(after_get_plan(pc_ctx, *session, result.get_physical_plan(),
result.get_is_from_plan_cache(), &params))) {
LOG_WARN("fail to handle after get plan", K(ret));
}
}
return ret;
}
/*!
* sql: 需要被执行的sql语句
* params: 当前sql语句的参数列表
* res: 直接结果集
*/
// TODO baixian.zr/hr351303, remove is_prepare_protocol and is_dynamic_sql
int ObSql::handle_pl_execute(const ObString &sql,
ObSQLSessionInfo &session,
ParamStore &params,
ObResultSet &result,
ObSqlCtx &context,
bool is_prepare_protocol,
bool is_dynamic_sql)
{
int ret = OB_SUCCESS;
int get_plan_err = OB_SUCCESS;
TimeoutGuard timeout_guard(session);
LinkExecCtxGuard link_guard(result.get_session(), result.get_exec_context());
int64_t cur_timeout_us = 0;
context.session_info_ = &session;
ObIAllocator &allocator = result.get_mem_pool();
ObExecContext &ectx = result.get_exec_context();
ObPhysicalPlanCtx *pctx = NULL;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(result.init())) {
LOG_WARN("failed to init result_set", K(ret));
} else if (OB_FAIL(init_result_set(context, result))) {
LOG_WARN("failed to init result set", K(ret));
} else {
context.cur_sql_ = sql;
context.is_dynamic_sql_ = is_dynamic_sql;
context.is_prepare_protocol_ = is_prepare_protocol;
context.disable_privilege_check_ = OB_SYS_TENANT_ID == session.get_priv_tenant_id()
? PRIV_CHECK_FLAG_DISABLE
: PRIV_CHECK_FLAG_IN_PL;
pctx = ectx.get_physical_plan_ctx();
ectx.get_das_ctx().get_schema_guard() = context.schema_guard_;
}
if (OB_SUCC(ret) && is_prepare_protocol && !is_dynamic_sql) {
result.set_simple_ps_protocol();
}
if (OB_SUCC(ret) && params.count() == 0) {
context.is_prepare_protocol_ = false; // text protocol
}
LOG_TRACE("arrive handle pl execute", K(ret), K(sql), K(is_prepare_protocol), K(is_dynamic_sql), K(lbt()));
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(pctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguement", K(ret));
} else if (OB_FAIL(set_timeout_for_pl(session, cur_timeout_us))) {
LOG_WARN("failed to set timeout for pl", K(ret));
} else if (OB_FAIL(session.store_query_string(sql))) {
LOG_WARN("store query string fail", K(ret));
} else if (OB_FAIL(handle_sql_execute(sql, context, result, params, PC_PL_MODE))) {
LOG_WARN("failed to handle sql execute", K(ret));
} else {
result.get_session().set_exec_min_cluster_version();
}
if (OB_FAIL(ret) && OB_SUCCESS == result.get_errcode()) {
result.set_errcode(ret);
}
//todo:@hr351303下面的逻辑后续挪到spi层
if (OB_SUCC(ret)) {
if (OB_ISNULL(context.schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema guard is null");
} else if (OB_FAIL(session.update_query_sensitive_system_variable(*(context.schema_guard_)))) {
LOG_WARN("update query affacted system variable failed", K(ret));
} else if (OB_FAIL(result.open())) {
LOG_WARN("result set open failed", K(ret));
} else {
// do nothing
}
}
if (session.get_in_transaction()) {
if (ObStmt::is_dml_write_stmt(result.get_stmt_type())) {
session.set_has_inner_dml_write(true);
}
}
FLT_SET_TAG(sql_id, context.sql_id_);
return ret;
}
int ObSql::handle_ps_prepare(const ObString &stmt,
ObSqlCtx &context,
ObResultSet &result,
@ -1240,15 +1616,15 @@ int ObSql::add_param_to_param_store(const ObObjParam &param,
return ret;
}
int ObSql::construct_param_store_from_ps_param(const ObPlanCacheCtx &phy_ctx,
int ObSql::construct_param_store_from_parameterized_params(const ObPlanCacheCtx &phy_ctx,
ParamStore &param_store)
{
int ret = OB_SUCCESS;
if (OB_FAIL(param_store.reserve(phy_ctx.fp_result_.ps_params_.count()))) {
if (OB_FAIL(param_store.reserve(phy_ctx.fp_result_.parameterized_params_.count()))) {
LOG_WARN("failed to reserve array", K(ret));
}
for (int i = 0; OB_SUCC(ret) && i < phy_ctx.fp_result_.ps_params_.count(); ++i) {
const common::ObObjParam *param = phy_ctx.fp_result_.ps_params_.at(i);
for (int i = 0; OB_SUCC(ret) && i < phy_ctx.fp_result_.parameterized_params_.count(); ++i) {
const common::ObObjParam *param = phy_ctx.fp_result_.parameterized_params_.at(i);
if (OB_FAIL(add_param_to_param_store(*param, param_store))) {
LOG_WARN("failed to add param to param store", K(ret));
}
@ -1328,15 +1704,15 @@ int ObSql::construct_param_store(const ParamStore &params,
return ret;
}
int ObSql::construct_ps_param(const ParamStore &params,
ObPlanCacheCtx &phy_ctx)
int ObSql::construct_parameterized_params(const ParamStore &params,
ObPlanCacheCtx &phy_ctx)
{
int ret = OB_SUCCESS;
phy_ctx.fp_result_.ps_params_.reset();
phy_ctx.fp_result_.ps_params_.set_allocator(&phy_ctx.allocator_);
phy_ctx.fp_result_.ps_params_.set_capacity(params.count());
phy_ctx.fp_result_.parameterized_params_.reset();
phy_ctx.fp_result_.parameterized_params_.set_allocator(&phy_ctx.allocator_);
phy_ctx.fp_result_.parameterized_params_.set_capacity(params.count());
for (int i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
if (OB_FAIL(phy_ctx.fp_result_.ps_params_.push_back(&params.at(i)))) {
if (OB_FAIL(phy_ctx.fp_result_.parameterized_params_.push_back(&params.at(i)))) {
LOG_WARN("add ps param failed", K(ret));
}
}
@ -1405,10 +1781,14 @@ int ObSql::init_execute_params_for_ab(ObIAllocator &allocator,
ParamStore *&first_group_params)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(first_group_params = static_cast<ParamStore *>(allocator.alloc(sizeof(ParamStore))))) {
if (OB_NOT_NULL(first_group_params)) {
// do nothing
} else if (OB_ISNULL(first_group_params = static_cast<ParamStore *>(allocator.alloc(sizeof(ParamStore))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else if (FALSE_IT(first_group_params = new(first_group_params)ParamStore(ObWrapperAllocator(allocator)))) {
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ObPlanCacheValue::get_one_group_params(0, params_store, *first_group_params))) {
LOG_WARN("fail to get the first group paramsters", K(ret));
@ -1422,6 +1802,34 @@ int ObSql::init_execute_params_for_ab(ObIAllocator &allocator,
return ret;
}
int ObSql::reconstruct_pl_params_store(ObIAllocator &allocator,
ObSqlCtx &context,
const ParamStore &origin_params,
ParamStore &pl_params,
ParamStore *&pl_ab_params)
{
int ret = OB_SUCCESS;
if (context.multi_stmt_item_.is_batched_multi_stmt()) {
ParamStore *first_group_params = &pl_params;
if (OB_FAIL(init_execute_params_for_ab(allocator, origin_params, first_group_params))) {
LOG_WARN("fail to init first batch params", K(ret), K(origin_params));
} else if (OB_ISNULL(pl_ab_params = static_cast<ParamStore *>(allocator.alloc(sizeof(ParamStore))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else if (FALSE_IT(pl_ab_params = new(pl_ab_params)ParamStore(ObWrapperAllocator(allocator)))) {
// do nothing
} else if (OB_FAIL(construct_param_store(origin_params,
*pl_ab_params))) {
LOG_WARN("construct param store failed", K(ret));
}
} else {
if (OB_FAIL(construct_param_store(origin_params, pl_params))) {
LOG_WARN("construct param store failed", K(ret));
}
}
return ret;
}
int ObSql::reconstruct_ps_params_store(ObIAllocator &allocator,
ObSqlCtx &context,
const ParamStore &origin_params,
@ -1545,16 +1953,16 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
} else if (FALSE_IT(generate_ps_sql_id(sql, context))) {
} else if (OB_LIKELY(ObStmt::is_dml_stmt(stmt_type))) {
//if plan not exist, generate plan
ObPlanCacheCtx pc_ctx(sql, true, /*is_ps_mode*/
allocator, context, ectx, session.get_effective_tenant_id());
ObPlanCacheCtx pc_ctx(sql, PC_PS_MODE, allocator, context, ectx,
session.get_effective_tenant_id());
pc_ctx.fp_result_.pc_key_.key_id_ = inner_stmt_id;
pc_ctx.normal_parse_const_cnt_ = ps_params.count();
context.spm_ctx_.bl_key_.db_id_ = session.get_database_id();
pc_ctx.set_is_ps_execute_stage();
pc_ctx.set_is_parameterized_execute();
pc_ctx.set_is_inner_sql(is_inner_sql);
pc_ctx.ab_params_ = ps_ab_params;
if (OB_FAIL(construct_ps_param(ps_params, pc_ctx))) {
LOG_WARN("construct_ps_param failed", K(ret));
if (OB_FAIL(construct_parameterized_params(ps_params, pc_ctx))) {
LOG_WARN("construct parameterized params failed", K(ret));
} else {
if (!use_plan_cache) {
/*do nothing*/
@ -1567,7 +1975,7 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
} else if (!result.get_is_from_plan_cache()) {
pctx->set_original_param_cnt(origin_params_count);
pctx->get_param_store_for_update().reset();
if (OB_FAIL(handle_physical_plan(sql, context, result, pc_ctx, get_plan_err, true))) {
if (OB_FAIL(handle_physical_plan(sql, context, result, pc_ctx, get_plan_err, PC_PS_MODE))) {
if (OB_ERR_PROXY_REROUTE == ret) {
LOG_DEBUG("fail to handle physical plan", K(ret));
} else {
@ -1587,7 +1995,7 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
ParseResult parse_result;
MEMSET(&parse_result, 0, SIZEOF(ParseResult));
if (OB_FAIL(generate_physical_plan(parse_result, NULL, context, result,
false/*is_begin_commit_stmt*/, true))) {
false/*is_begin_commit_stmt*/, PC_PS_MODE))) {
LOG_WARN("generate physical plan failed", K(ret));
} else {
const ObPsSqlMeta &sql_meta = ps_info->get_ps_sql_meta();
@ -1625,7 +2033,7 @@ int ObSql::handle_ps_execute(const ObPsStmtId client_stmt_id,
if (OB_FAIL(ret)) {
} else if (OB_FAIL(generate_physical_plan(
parse_result, NULL, context, result, false /*is_begin_commit_stmt*/, true))) {
parse_result, NULL, context, result, false /*is_begin_commit_stmt*/, PC_PS_MODE))) {
LOG_WARN("generate physical plan failed", K(ret), K(sql), K(stmt_type));
} // TODO 生成物理计划的路径可x需q区分
}
@ -1667,9 +2075,10 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
const uint64_t tenant_id = session->get_effective_tenant_id();
bool use_plan_cache = session->get_local_ob_enable_plan_cache();
context.self_add_plan_ = false;
PlanCacheMode mode = remote_sql_info.use_ps_ ? PC_PS_MODE : PC_TEXT_MODE;
context.cur_sql_ = trimed_stmt;
pc_ctx = new (pc_ctx) ObPlanCacheCtx(trimed_stmt,
remote_sql_info.use_ps_, /*is_ps_mode*/
mode,
allocator,
context,
exec_ctx,
@ -1684,12 +2093,12 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
pc_ctx->fp_result_.pc_key_.key_id_ = 0;
pc_ctx->fp_result_.pc_key_.name_ = trimed_stmt;
pc_ctx->normal_parse_const_cnt_ = remote_sql_info.ps_params_->count();
pc_ctx->set_is_parameterized_execute();
pc_ctx->is_original_ps_mode_ = remote_sql_info.is_original_ps_mode_;
pc_ctx->set_is_ps_execute_stage();
if (OB_FAIL(construct_param_store(*remote_sql_info.ps_params_, param_store))) {
LOG_WARN("construct param store failed", K(ret));
} else if (OB_FAIL(construct_ps_param(param_store, *pc_ctx))) {
LOG_WARN("construct_ps_param failed", K(ret));
} else if (OB_FAIL(construct_parameterized_params(param_store, *pc_ctx))) {
LOG_WARN("construct parameterized params failed", K(ret));
}
} else if (remote_sql_info.is_batched_stmt_) {
//这里保持跟控制端一致,如果是batched stmt,需要先做一次parser的切分
@ -1743,11 +2152,12 @@ int ObSql::handle_remote_query(const ObRemoteSqlInfo &remote_sql_info,
tmp_result.set_exec_context(exec_ctx);
//经过plan cache的计算后,param_store里的值可能会增加,因为plan cache中会执行pre calculation
//这里要再进行计划生成,需要把plan cache中pre calculation加入的param清除掉
int64_t initial_param_count = pc_ctx->fp_result_.ps_params_.count();
int64_t initial_param_count = pc_ctx->fp_result_.parameterized_params_.count();
for (int64_t i = remote_sql_info.ps_params_->count(); i > initial_param_count; --i) {
remote_sql_info.ps_params_->pop_back();
}
if (OB_FAIL(handle_physical_plan(trimed_stmt, context, tmp_result, *pc_ctx, get_plan_err, remote_sql_info.use_ps_))) {
PlanCacheMode mode = remote_sql_info.use_ps_ ? PC_PS_MODE : PC_TEXT_MODE;
if (OB_FAIL(handle_physical_plan(trimed_stmt, context, tmp_result, *pc_ctx, get_plan_err, mode))) {
if (OB_ERR_PROXY_REROUTE == ret) {
LOG_DEBUG("fail to handle physical plan", K(ret));
} else {
@ -1861,7 +2271,7 @@ OB_INLINE int ObSql::handle_text_query(const ObString &stmt, ObSqlCtx &context,
} else {
context.cur_sql_ = trimed_stmt;
pc_ctx = new (buf) ObPlanCacheCtx(trimed_stmt,
false, /*is_ps_mode*/
PC_TEXT_MODE,
allocator,
context,
ectx,
@ -2114,8 +2524,6 @@ int ObSql::generate_stmt(ParseResult &parse_result,
KP(parse_result.result_tree_->children_[0]));
}
if (OB_SUCC(ret)) {
// set # of question marks
if (context.is_prepare_protocol_ && !context.is_prepare_stage_) {
@ -2286,7 +2694,7 @@ int ObSql::generate_physical_plan(ParseResult &parse_result,
ObSqlCtx &sql_ctx,
ObResultSet &result,
const bool is_begin_commit_stmt,
const bool is_ps_mode /* false */,
const PlanCacheMode mode,
ParseResult *outline_parse_result /* null */ )
{
int ret = OB_SUCCESS;
@ -2349,7 +2757,7 @@ int ObSql::generate_physical_plan(ParseResult &parse_result,
} else if (OB_ISNULL(result.get_exec_context().get_stmt_factory()->get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query ctx is null", K(ret));
} else if (OB_FAIL(fill_result_set(result, &sql_ctx, is_ps_mode, *basic_stmt))) {
} else if (OB_FAIL(fill_result_set(result, &sql_ctx, mode, *basic_stmt))) {
LOG_WARN("Failed to fill result set", K(ret));
} else if (OB_FAIL(sql_ctx.session_info_->get_sys_variable(share::SYS_VAR__AGGREGATION_OPTIMIZATION_SETTINGS,
aggregate_setting))) {
@ -2509,7 +2917,7 @@ int ObSql::generate_physical_plan(ParseResult &parse_result,
result.set_cmd(cmd);
result.get_session().set_cur_sql_id(sql_ctx.sql_id_);
if (!is_begin_commit_stmt
&& OB_FAIL(fill_result_set(result, &sql_ctx, is_ps_mode, *basic_stmt))) {
&& OB_FAIL(fill_result_set(result, &sql_ctx, mode, *basic_stmt))) {
LOG_WARN("Failed to fill result set", K(ret));
}
}
@ -2945,7 +3353,8 @@ int ObSql::execute_get_plan(ObPlanCache &plan_cache,
if (OB_ISNULL(session) || OB_ISNULL(pctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is NULL", K(ret));
} else if (pc_ctx.is_ps_mode_) {
} else if (PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_) {
// TODO baixian.zr, change pl mode hit cache as text mode.
ObPsStmtId stmt_id = pc_ctx.fp_result_.pc_key_.key_id_;
guard.init(PS_EXEC_HANDLE);
if (OB_FAIL(plan_cache.get_ps_plan(guard, stmt_id, pc_ctx))) {
@ -3021,7 +3430,7 @@ int ObSql::pc_get_plan(ObPlanCacheCtx &pc_ctx,
|| OB_BATCHED_MULTI_STMT_ROLLBACK == ret
|| OB_NEED_SWITCH_CONSUMER_GROUP == ret) {
/*do nothing*/
} else if (!pc_ctx.is_ps_mode_
} else if (!(PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_)
&& OB_PC_LOCK_CONFLICT == ret
&& !session->is_inner()) {
//不是ps模式, 不是inner sql, 且plan cache锁超时, 后面会放入大查询队列列,
@ -3240,8 +3649,9 @@ int ObSql::parser_and_check(const ObString &outlined_stmt,
pc_ctx.is_rewrite_sql_ ? UDR_SQL_MODE : STD_MODE,
pc_ctx.sql_ctx_.handle_batched_multi_stmt()))) {
LOG_WARN("Generate syntax tree failed", K(outlined_stmt), K(ret));
} else if (pc_ctx.is_ps_mode_
&& OB_FAIL(construct_param_store_from_ps_param(pc_ctx, pctx->get_param_store_for_update()))) {
} else if ((PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_)
&& OB_FAIL(construct_param_store_from_parameterized_params(
pc_ctx, pctx->get_param_store_for_update()))) {
LOG_WARN("construct param store failed", K(ret));
}
if (OB_SUCC(ret)) {
@ -3299,8 +3709,8 @@ int ObSql::parser_and_check(const ObString &outlined_stmt,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(children_node));
//除了普通的dml stmt,explain stmt中存在?也需要这里一起判断
} else if (!pc_ctx.is_ps_mode_ &&
(children_node->type_ == T_EXPLAIN || IS_DML_STMT(children_node->type_))
} else if (!(PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_)
&& (children_node->type_ == T_EXPLAIN || IS_DML_STMT(children_node->type_))
&& (children_node->value_ > 0)) {
ret = OB_ERR_PARSE_SQL;//children_node->value_ > 0,说明具有question_mark
const char *err_msg = "?";
@ -3471,11 +3881,12 @@ int ObSql::pc_add_plan(ObPlanCacheCtx &pc_ctx,
LOG_WARN("tenant config is invalid", K(ret));
} else if (OB_USE_PLAN_CACHE_NONE == phy_plan->get_phy_plan_hint().plan_cache_policy_) {
LOG_DEBUG("Hint not use plan cache");
} else if (OB_FAIL(result.to_plan(pc_ctx.is_ps_mode_, phy_plan))) {
LOG_WARN("Failed copy field to plan", K(ret));
} else if (OB_FAIL(result.to_plan(pc_ctx.mode_, phy_plan))) {
LOG_WARN("Failed copy field to pplan", K(ret));
} else if (OB_FAIL(ob_write_string(phy_plan->get_allocator(),
pc_ctx.is_ps_mode_ ? pc_ctx.raw_sql_ :
pc_ctx.sql_ctx_.spm_ctx_.bl_key_.constructed_sql_,
(PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_)
? pc_ctx.raw_sql_ :
pc_ctx.sql_ctx_.spm_ctx_.bl_key_.constructed_sql_,
phy_plan->stat_.constructed_sql_))) {
LOG_WARN("failed to ob write string", K(ret));
} else if (OB_FAIL(ob_write_string(phy_plan->get_allocator(),
@ -3492,7 +3903,7 @@ int ObSql::pc_add_plan(ObPlanCacheCtx &pc_ctx,
phy_plan->stat_.rule_version_ = rule_mgr->get_rule_version();
phy_plan->stat_.enable_udr_ = tenant_config->enable_user_defined_rewrite_rules;
if (pc_ctx.is_ps_mode_) {
if (PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_) {
//远程SQL第二次进入plan,将raw_sql作为pc_key存入plan cache中,
//然后使用ps接口直接用参数化后的sql作为key来查plan cache,可以节省一次对SQL fast parse的代价
if (pc_ctx.sql_ctx_.is_remote_sql_) {
@ -3780,7 +4191,7 @@ int ObSql::pc_add_udr_plan(const ObUDRItemMgr::UDRItemRefGuard &item_guard,
ObPhysicalPlanCtx *pctx = ectx.get_physical_plan_ctx();
ParamStore param_store( (ObWrapperAllocator(&allocator)) );
const ObString &raw_sql = pc_ctx.raw_sql_;
ObPlanCacheCtx tmp_pc_ctx(raw_sql, pc_ctx.is_ps_mode_,
ObPlanCacheCtx tmp_pc_ctx(raw_sql, pc_ctx.mode_,
allocator, pc_ctx.sql_ctx_, ectx, session.get_effective_tenant_id());
tmp_pc_ctx.fp_result_ = pc_ctx.fp_result_;
tmp_pc_ctx.normal_parse_const_cnt_ = pc_ctx.normal_parse_const_cnt_;
@ -3819,7 +4230,7 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
ObResultSet &result,
ObPlanCacheCtx &pc_ctx,
const int get_plan_err,
bool is_psmode)
PlanCacheMode mode)
{
int ret = OB_SUCCESS;
FLTSpanGuard(hard_parse);
@ -3848,7 +4259,7 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
LOG_DEBUG("gen plan info", K(spm_ctx.bl_key_), K(get_plan_err));
// for batched multi stmt, we only parse and optimize the first statement
// only in multi_query, need do this
if (!is_psmode &&
if (!(PC_PS_MODE == mode || PC_PL_MODE == mode) &&
context.multi_stmt_item_.is_batched_multi_stmt() &&
OB_FAIL(get_first_batched_multi_stmt(context.multi_stmt_item_, outlined_stmt))) {
LOG_WARN("failed to get first batched stmt item", K(ret));
@ -3870,7 +4281,7 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
is_enable_transform_tree))) {
LOG_WARN("fail to parser and check", K(ret));
} else if (context.multi_stmt_item_.is_batched_multi_stmt() &&
!is_psmode &&
!(PC_PS_MODE == mode || PC_PL_MODE == mode) &&
OB_FAIL(check_batched_multi_stmt_after_parser(pc_ctx,
parse_result,
add_plan_to_pc,
@ -3891,7 +4302,7 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
context,
result,
pc_ctx.is_begin_commit_stmt(),
is_psmode,
mode,
&outline_parse_result))) {
if (OB_ERR_PROXY_REROUTE == ret) {
LOG_DEBUG("Failed to generate plan", K(ret));
@ -4140,7 +4551,8 @@ void ObSql::generate_sql_id(ObPlanCacheCtx &pc_ctx,
&& parse_result.result_tree_->children_[0]->type_ == T_SP_CALL_STMT) {
signature_sql = pc_ctx.sql_ctx_.spm_ctx_.bl_key_.constructed_sql_;
} else if (add_plan_to_pc == false
|| pc_ctx.is_ps_mode_
|| PC_PS_MODE == pc_ctx.mode_
|| PC_PL_MODE == pc_ctx.mode_
|| OB_SUCCESS != err_code) {
signature_sql = pc_ctx.raw_sql_;
} else {