[FEAT MERGE]:develop pl feature of 4.2 version

Co-authored-by: LiuYoung00 <liuyanglo_ol@163.com>
Co-authored-by: 0xacc <heyongyi1998@gmail.com>
Co-authored-by: seuwebber <webber_code@163.com>
This commit is contained in:
obdev
2023-04-27 16:08:10 +08:00
committed by ob-robot
parent 8e9c9d0c5f
commit 57f1c6e7ee
92 changed files with 3534 additions and 1304 deletions

View File

@ -68,6 +68,7 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
int ret = OB_SUCCESS;
uint64_t package_id = OB_INVALID_ID;
uint64_t routine_id = OB_INVALID_ID;
ObCallProcedureInfo *call_proc_info = NULL;
LOG_DEBUG("call procedure execute", K(stmt));
if (OB_ISNULL(ctx.get_pl_engine()) || OB_ISNULL(ctx.get_output_row())) {
ret = OB_ERR_UNEXPECTED;
@ -89,27 +90,42 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
ctx.get_my_session()->get_current_query_string(),
ctx.get_stmt_factory()->get_query_ctx()->get_sql_stmt()))) {
LOG_WARN("fail to set query string");
} else if (OB_ISNULL(call_proc_info = stmt.get_call_proc_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("call procedure info is null", K(ret));
} else {
ParamStore params( (ObWrapperAllocator(ctx.get_allocator())) );
const share::schema::ObRoutineInfo *routine_info = NULL;
if (!stmt.can_direct_use_param()) {
if (!call_proc_info->can_direct_use_param()) {
ObObjParam param;
for (int64_t i = 0; OB_SUCC(ret) && i < stmt.get_params().count(); ++i) {
const ObRawExpr *expr = stmt.get_params().at(i);
const ParamStore &origin_params = ctx.get_physical_plan_ctx()->get_param_store_for_update();
pl::ExecCtxBak exec_ctx_bak;
sql::ObPhysicalPlanCtx phy_plan_ctx(ctx.get_allocator());
phy_plan_ctx.set_timeout_timestamp(ctx.get_physical_plan_ctx()->get_timeout_timestamp());
exec_ctx_bak.backup(ctx);
ctx.set_physical_plan_ctx(&phy_plan_ctx);
if (call_proc_info->get_expr_op_size() > 0) {
OZ (ctx.init_expr_op(call_proc_info->get_expr_op_size()));
}
OZ (call_proc_info->get_frame_info().pre_alloc_exec_memory(ctx));
for (int64_t i = 0; OB_SUCC(ret) && i < origin_params.count(); ++i) {
OZ (phy_plan_ctx.get_param_store_for_update().push_back(origin_params.at(i)));
}
for (int64_t i = 0; OB_SUCC(ret) && i < call_proc_info->get_expressions().count(); ++i) {
const ObSqlExpression *expr = call_proc_info->get_expressions().at(i);
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("param expr NULL", K(i), K(ret));
} else {
param.reset();
param.ObObj::reset();
param.set_accuracy(expr->get_accuracy());
param.set_result_flag(expr->get_result_flag());
if (OB_FAIL(ObSQLUtils::calc_raw_expr_without_row(ctx, expr, param,
&(ctx.get_physical_plan_ctx()->get_param_store_for_update()), ctx.get_allocator()))) {
if (OB_FAIL(ObSQLUtils::calc_sql_expression_without_row(ctx, *expr, param))) {
LOG_WARN("failed to calc exec param expr", K(i), K(*expr), K(ret));
} else {
if (expr->has_flag(ObExprInfoFlag::IS_PL_MOCK_DEFAULT_EXPR)) {
if (expr->get_is_pl_mock_default_expr()) {
param.set_is_pl_mock_default_param(true);
}
if (OB_FAIL(params.push_back(param))) {
@ -120,12 +136,18 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
}
}
} // for end
if (call_proc_info->get_expr_op_size() > 0) {
ctx.reset_expr_op();
ctx.get_allocator().free(ctx.get_expr_op_ctx_store());
}
exec_ctx_bak.restore(ctx);
} else {
LOG_DEBUG("direct use params", K(ret), K(stmt));
int64_t param_cnt = ctx.get_physical_plan_ctx()->get_param_store().count();
if (stmt.get_param_cnt() != param_cnt) {
if (call_proc_info->get_param_cnt() != param_cnt) {
ret = OB_ERR_SP_WRONG_ARG_NUM;
LOG_WARN("argument number not equal", K(stmt.get_param_cnt()), K(param_cnt), K(ret));
LOG_WARN("argument number not equal", K(call_proc_info->get_param_cnt()), K(param_cnt), K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
LOG_DEBUG("params", "param", ctx.get_physical_plan_ctx()->get_param_store().at(i), K(i));
@ -135,14 +157,14 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
}
}
if (OB_SUCC(ret)) {
package_id = stmt.get_package_id();
routine_id = stmt.get_routine_id();
package_id = call_proc_info->get_package_id();
routine_id = call_proc_info->get_routine_id();
}
if (OB_SUCC(ret)) {
ObArray<int64_t> path;
ObArray<int64_t> nocopy_params;
ObObj result;
int64_t pkg_id = stmt.is_udt_routine()
int64_t pkg_id = call_proc_info->is_udt_routine()
? share::schema::ObUDTObjectType::mask_object_id(package_id) : package_id;
if (OB_FAIL(ctx.get_pl_engine()->execute(ctx,
ctx.get_allocator(),
@ -160,20 +182,20 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
}
}
if (OB_FAIL(ret)) {
} else if (stmt.get_output_count() > 0) {
ctx.get_output_row()->count_ = stmt.get_output_count();
} else if (call_proc_info->get_output_count() > 0) {
ctx.get_output_row()->count_ = call_proc_info->get_output_count();
if (OB_ISNULL(ctx.get_output_row()->cells_ = static_cast<ObObj *>(
ctx.get_allocator().alloc(sizeof(ObObj) * stmt.get_output_count())))) {
ctx.get_allocator().alloc(sizeof(ObObj) * call_proc_info->get_output_count())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc obj array", K(stmt.get_output_count()), K(ret));
LOG_WARN("fail to alloc obj array", K(call_proc_info->get_output_count()), K(ret));
} else {
int64_t idx = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < params.count(); ++i) {
if (stmt.is_out_param(i)) {
if (call_proc_info->is_out_param(i)) {
if (ob_is_enum_or_set_type(params.at(i).get_type())) {
OZ (ObSPIService::cast_enum_set_to_string(
ctx,
stmt.get_out_type().at(idx).get_type_info(),
call_proc_info->get_out_type().at(idx).get_type_info(),
params.at(i),
ctx.get_output_row()->cells_[idx]));
OX (idx++);
@ -183,30 +205,26 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
}
if (OB_FAIL(ret)) {
} else if (!stmt.can_direct_use_param()) {
const ObRawExpr *expr = stmt.get_params().at(i);
if (OB_LIKELY(expr->is_const_raw_expr())) {
const ObConstRawExpr *const_expr = static_cast<const ObConstRawExpr *>(expr);
if (T_QUESTIONMARK == const_expr->get_expr_type()) {
int64_t idx = const_expr->get_value().get_unknown();
} else if (!call_proc_info->can_direct_use_param()) {
const ObSqlExpression *expr = call_proc_info->get_expressions().at(i);
ObItemType expr_type = expr->get_expr_items().at(0).get_item_type();
if (OB_LIKELY(IS_CONST_TYPE(expr_type))) {
const ObObj &value = expr->get_expr_items().at(0).get_obj();
if (T_QUESTIONMARK == expr_type) {
int64_t idx = value.get_unknown();
ctx.get_physical_plan_ctx()->get_param_store_for_update().at(idx) = params.at(i);
} else {
/* do nothing */
}
} else if (T_OP_GET_USER_VAR == expr->get_expr_type()) { //这里只有可能出现用户变量
const ObSysFunRawExpr *func_expr = static_cast<const ObSysFunRawExpr*>(expr);
} else if (T_OP_GET_USER_VAR == expr_type) { //这里只有可能出现用户变量
ObExprCtx expr_ctx;
if (OB_FAIL(ObSQLUtils::wrap_expr_ctx(stmt.get_stmt_type(), ctx, ctx.get_allocator(), expr_ctx))) {
if (expr->get_expr_items().count() < 2 || T_VARCHAR != expr->get_expr_items().at(1).get_item_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected result expr", K(*expr), K(ret));
} else if (OB_FAIL(ObSQLUtils::wrap_expr_ctx(stmt.get_stmt_type(), ctx, ctx.get_allocator(), expr_ctx))) {
LOG_WARN("Failed to wrap expr ctx", K(ret));
} else if (OB_ISNULL(func_expr->get_param_expr(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sys var is NULL", K(*func_expr), K(ret));
} else if (OB_UNLIKELY(!func_expr->get_param_expr(0)->is_const_raw_expr()
|| !static_cast<const ObConstRawExpr*>(func_expr->get_param_expr(0))->get_value().is_varchar())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid sys var", K(*func_expr->get_param_expr(0)), K(ret));
} else {
const ObString var_name = static_cast<const ObConstRawExpr*>(func_expr->get_param_expr(0))->get_value().get_varchar();
const ObString var_name = expr->get_expr_items().at(1).get_obj().get_string();
if (OB_FAIL(ObVariableSetExecutor::set_user_variable(params.at(i), var_name, expr_ctx))) {
LOG_WARN("set user variable failed", K(ret));
}

View File

@ -59,10 +59,7 @@ int ObCreateTriggerExecutor::execute(ObExecContext &ctx, ObCreateTriggerStmt &st
ctx.get_allocator(),
arg));
if (OB_SUCC(ret)) {
if (lib::is_oracle_mode() ||
(lib::is_mysql_mode() && arg.error_info_.get_error_status() != ERROR_STATUS_NO_ERROR)) {
OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server());
}
OZ (common_rpc_proxy->create_trigger(arg), common_rpc_proxy->get_server());
}
OZ (ctx.get_sql_ctx()->schema_guard_->reset());
return ret;
@ -134,7 +131,8 @@ int ObCreateTriggerExecutor::analyze_dependencies(ObSchemaGetterGuard &schema_gu
if (OB_SUCC(ret)) {
arg.trigger_info_.deep_copy(*trigger_info);
arg.error_info_.collect_error_info(&arg.trigger_info_);
arg.for_insert_errors_ = true;
arg.in_second_stage_ = true;
arg.with_replace_ = true;
}
}
return ret;