diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h index 36b9105add..e2794c45e2 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h @@ -133,7 +133,8 @@ public: ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) = 0; + const ObTimeZoneInfo *tz_info, + ObObj *result) = 0; virtual int prepare(const char *sql) { UNUSED(sql); return OB_NOT_SUPPORTED; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp index c24e85c94f..0171f0885a 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp @@ -517,7 +517,8 @@ int ObMySQLConnection::execute_proc(const uint64_t tenant_id, ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) + const ObTimeZoneInfo *tz_info, + ObObj *result) { int ret = OB_SUCCESS; if (OB_UNLIKELY(closed_)) { diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h index cac6b64412..f8734b6150 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h @@ -90,7 +90,8 @@ public: ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) override; + const ObTimeZoneInfo *tz_info, + ObObj *result) override; virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) override; virtual int rollback() override; virtual int commit() override; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp index aed12be293..7bb11b44ef 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp @@ -738,14 +738,15 @@ int ObDbLinkProxy::dblink_execute_proc(const uint64_t tenant_id, ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) + const ObTimeZoneInfo *tz_info, + ObObj *result) { int ret = OB_SUCCESS; if (OB_ISNULL(dblink_conn) || sql.empty()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("null ptr", K(ret), KP(dblink_conn), K(sql)); } else if (OB_FAIL(dblink_conn->execute_proc(tenant_id, allocator, params, sql, - routine_info, udts, tz_info))) { + routine_info, udts, tz_info, result))) { LOG_WARN("call procedure to dblink failed", K(ret), K(dblink_conn), K(sql)); } else { LOG_DEBUG("succ to call procedure by dblink", K(sql)); diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h index c614fd5b71..e446618c97 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h @@ -203,7 +203,8 @@ public: ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info); + const ObTimeZoneInfo *tz_info, + ObObj *result); int dblink_prepare(sqlclient::ObISQLConnection *dblink_conn, const char *sql); int dblink_bind_basic_type_by_pos(sqlclient::ObISQLConnection *dblink_conn, uint64_t position, diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index 54a96fbac1..86a6731389 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -1428,9 +1428,10 @@ int ObInnerSQLConnection::execute_proc(const uint64_t tenant_id, ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) + const ObTimeZoneInfo *tz_info, + ObObj *result) { - UNUSEDx(tenant_id, allocator, params, sql, routine_info, udts, tz_info); + UNUSEDx(tenant_id, allocator, params, sql, routine_info, udts, tz_info, result); int ret = OB_SUCCESS; return ret; } diff --git a/src/observer/ob_inner_sql_connection.h b/src/observer/ob_inner_sql_connection.h index a5317b6d25..f45efa9e73 100644 --- a/src/observer/ob_inner_sql_connection.h +++ b/src/observer/ob_inner_sql_connection.h @@ -163,7 +163,8 @@ public: ObString &sql, const share::schema::ObRoutineInfo &routine_info, const common::ObIArray &udts, - const ObTimeZoneInfo *tz_info) override; + const ObTimeZoneInfo *tz_info, + ObObj *result) override; virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) override; virtual int register_multi_data_source(const uint64_t &tenant_id, const share::ObLSID ls_id, diff --git a/src/pl/dblink/ob_pl_dblink_guard.cpp b/src/pl/dblink/ob_pl_dblink_guard.cpp index 0bca5cbd1b..01c8696f6e 100644 --- a/src/pl/dblink/ob_pl_dblink_guard.cpp +++ b/src/pl/dblink/ob_pl_dblink_guard.cpp @@ -277,21 +277,6 @@ int ObPLDbLinkGuard::get_dblink_routine_infos(common::ObDbLinkProxy *dblink_prox routine_name, routine_infos, next_link_object_id_)); - if (OB_SUCC(ret)) { - bool is_all_func = true; - for (int64_t i = 0; OB_SUCC(ret) && i < routine_infos.count(); i++) { - const ObRoutineInfo *r = static_cast(routine_infos.at(i)); - CK (OB_NOT_NULL(r)); - if (OB_SUCC(ret) && ObRoutineType::ROUTINE_PROCEDURE_TYPE == r->get_routine_type()) { - is_all_func = false; - break; - } - } - if (OB_SUCC(ret) && is_all_func) { - ret = OB_ERR_NOT_VALID_ROUTINE_NAME; - LOG_WARN("ORA-06576: not a valid function or procedure name", K(ret), K(pkg_name), K(routine_name)); - } - } #endif return ret; } @@ -398,13 +383,23 @@ int ObPLDbLinkGuard::dblink_name_resolve(common::ObDbLinkProxy *dblink_proxy, BIND_BASIC_BY_POS(7, &part1_type, static_cast(sizeof(int)), oci_sql_int); if (FAILEDx(dblink_proxy->dblink_execute_proc(dblink_conn))) { const DblinkDriverProto link_type = static_cast(dblink_schema->get_driver_proto()); - LOG_WARN("read link failed", K(ret), K(ObString(call_proc))); + if (OB_ERR_ILL_OBJ_FLAG == ret) { + ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS; + LOG_WARN("invalid identifier", K(ret), K(full_name_copy)); + LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, full_name_copy.length(), full_name_copy.ptr()); + } else { + LOG_WARN("read link failed", K(ret), K(ObString(call_proc))); + } } else { switch (part1_type) { case OracleObjectType::ORA_PROCEUDRE: // procedure object_type = static_cast(ObObjectType::PROCEDURE); break; + case OracleObjectType::ORA_FUNCTION: + // function + object_type = static_cast(ObObjectType::FUNCTION); + break; case OracleObjectType::ORA_PACKAGE: // package object_type = static_cast(ObObjectType::PACKAGE); diff --git a/src/pl/ob_pl.cpp b/src/pl/ob_pl.cpp index 59e43ad960..21d59203ab 100644 --- a/src/pl/ob_pl.cpp +++ b/src/pl/ob_pl.cpp @@ -50,6 +50,7 @@ #include "pl/pl_cache/ob_pl_cache_mgr.h" #include "sql/engine/dml/ob_trigger_handler.h" #include "pl/ob_pl_allocator.h" +#include "sql/dblink/ob_tm_service.h" namespace oceanbase { using namespace common; @@ -302,19 +303,6 @@ int ObPL::execute_proc(ObPLExecCtx &ctx, } } if (OB_FAIL(ret)) { -#ifdef OB_BUILD_ORACLE_PL - } else if (OB_INVALID_ID != dblink_id) { - if (OB_FAIL(ObSPIService::spi_execute_dblink(&ctx, dblink_id, package_id, proc_id, proc_params))) { - LOG_WARN("execute dblink routine failed", K(ret)); - } - if (OB_SUCC(ret)) { - if (NULL != argv && argc > 0) { - for (int64_t i = 0; OB_SUCC(ret) && i < argc; ++i) { - *argv[i] = proc_params.at(i); - } - } - } -#endif } else { share::schema::ObSchemaGetterGuard schema_guard; const uint64_t tenant_id = ctx.exec_ctx_->get_my_session()->get_effective_tenant_id(); @@ -336,7 +324,9 @@ int ObPL::execute_proc(ObPLExecCtx &ctx, ctx.status_, true, ctx.in_function_, - loc))) { + loc, + false, + dblink_id))) { LOG_WARN("failed to execute pl", K(ret), K(package_id), K(proc_id), K(ctx.in_function_)); } } catch (...) { @@ -607,28 +597,28 @@ int ObPLContext::init(ObSQLSessionInfo &session_info, ObExecContext &ctx, ObPLFunction *routine, bool is_function_or_trigger, - ObIAllocator *allocator) + ObIAllocator *allocator, + const bool is_dblink) { int ret = OB_SUCCESS; int64_t pl_block_timeout = 0; int64_t query_start_time = session_info.get_query_start_time(); - CK (OB_NOT_NULL(routine)); - OX (is_function_or_trigger |= routine->is_function()); - OX (is_autonomous_ = routine->is_autonomous()); - OX (is_function_or_trigger_ = is_function_or_trigger); + if (!is_dblink) { + CK (OB_NOT_NULL(routine)); + OX (is_function_or_trigger |= routine->is_function()); + OX (is_autonomous_ = routine->is_autonomous()); + OX (is_function_or_trigger_ = is_function_or_trigger); + } OZ (session_info.get_pl_block_timeout(pl_block_timeout)); - if (OB_SUCC(ret) && pl_block_timeout > OB_MAX_USER_SPECIFIED_TIMEOUT) { - pl_block_timeout = OB_MAX_USER_SPECIFIED_TIMEOUT; - } + OX (pl_block_timeout = std::min(pl_block_timeout, OB_MAX_USER_SPECIFIED_TIMEOUT)); if (is_function_or_trigger && (ObTimeUtility::current_time() + pl_block_timeout) < THIS_WORKER.get_timeout_ts()) { OX (old_worker_timeout_ts_ = THIS_WORKER.get_timeout_ts()); OX (THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + pl_block_timeout)); if (OB_SUCC(ret) && OB_NOT_NULL(ctx.get_physical_plan_ctx())) { old_phy_plan_timeout_ts_ = ctx.get_physical_plan_ctx()->get_timeout_timestamp(); - ctx.get_physical_plan_ctx() - ->set_timeout_timestamp(ObTimeUtility::current_time() + pl_block_timeout); + ctx.get_physical_plan_ctx()->set_timeout_timestamp(ObTimeUtility::current_time() + pl_block_timeout); } } if (OB_ISNULL(session_info.get_pl_context())) { @@ -684,7 +674,8 @@ int ObPLContext::init(ObSQLSessionInfo &session_info, } } - OZ (ob_write_string(allocator != NULL ? *allocator : ctx.get_allocator(), session_info.get_current_query_string(), cur_query_)); + OZ (ob_write_string(allocator != NULL ? *allocator + : ctx.get_allocator(), session_info.get_current_query_string(), cur_query_)); OZ (recursion_ctx_.init(session_info)); OX (session_info.set_pl_stack_ctx(this)); @@ -815,6 +806,12 @@ void ObPLContext::destory( ret = OB_SUCCESS == ret ? OB_ERR_UNEXPECTED : ret; LOG_ERROR("current stack ctx is top, but session info is not", K(ret)); } else { +#define IS_DBLINK_TRANS \ + transaction::ObTxDesc *tx_desc = session_info.get_tx_desc(); \ + const transaction::ObXATransID xid = session_info.get_xid(); \ + const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid); \ + bool is_dblink = (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type); + if (!in_nested_sql_ctrl() && lib::is_mysql_mode() && is_function_or_trigger_ && OB_SUCCESS == ret && @@ -843,7 +840,18 @@ void ObPLContext::destory( } #ifdef OB_BUILD_ORACLE_PL } else if (session_info.associated_xa()) { - if (OB_TRANS_XA_BRANCH_FAIL != ret) { + IS_DBLINK_TRANS; + if (is_dblink) { + transaction::ObTransID tx_id; + const bool force_disconnect = false; + int rl_ret = OB_SUCCESS; + if (OB_SUCCESS != (rl_ret = ObTMService::tm_rollback(ctx, tx_id))) { + LOG_WARN("failed to rollback for dblink trans", K(ret), K(rl_ret), K(tx_id), K(xid), K(global_tx_type)); + } else if (OB_SUCCESS != (rl_ret = session_info.get_dblink_context().clean_dblink_conn(force_disconnect))) { + LOG_WARN("dblink trans failed to release dblink connections", K(ret), K(rl_ret), K(tx_id), K(xid)); + } + ret = OB_SUCCESS == ret ? rl_ret : ret; + } else if (OB_TRANS_XA_BRANCH_FAIL != ret) { tmp_ret = ObDbmsXA::xa_rollback_savepoint(ctx); if (OB_SUCCESS != tmp_ret) { LOG_WARN("xa trans roll back to save point failed", @@ -893,6 +901,21 @@ void ObPLContext::destory( tmp_ret = implicit_end_trans(session_info, ctx, ret != OB_SUCCESS); } ret = OB_SUCCESS == ret ? tmp_ret : ret; + } else { + // in XA trans, check whether the trans is dblink trans. + int cm_ret = OB_SUCCESS; + IS_DBLINK_TRANS; + if (is_dblink) { + transaction::ObTransID tx_id; + const bool force_disconnect = false; + if (OB_SUCCESS !=(cm_ret = ObTMService::tm_commit(ctx, tx_id))) { + LOG_WARN("failed to commit dblink trans", K(ret), K(tx_id), K(xid), K(global_tx_type)); + } else if (OB_SUCCESS != (cm_ret = session_info.get_dblink_context().clean_dblink_conn(force_disconnect))) { + LOG_WARN("dblink trans failed to release dblink connections", K(ret), K(cm_ret), K(tx_id), K(xid)); + } + ret = OB_SUCCESS == ret ? cm_ret : ret; + } + ctx.set_need_disconnect(false); } } @@ -916,6 +939,7 @@ void ObPLContext::destory( session_info_ = NULL; IGNORE_RETURN ObPLContext::debug_stop(&session_info); +#undef IS_DBLINK_TRANS } else if (lib::is_mysql_mode() && is_function_or_trigger_) { // 非嵌套场景: 内层udf一定是在表达式里面, 提交由spi_calc_expr处来保证 // 嵌套场景: 内层udf被dml语句触发, 回滚或提交由外层dml语句保证 @@ -1940,7 +1964,8 @@ int ObPL::execute(ObExecContext &ctx, bool inner_call, bool in_function, uint64_t loc, - bool is_called_from_sql) + bool is_called_from_sql, + uint64_t dblink_id) { int ret = OB_SUCCESS; DISABLE_SQL_MEMLEAK_GUARD; @@ -1977,81 +2002,89 @@ int ObPL::execute(ObExecContext &ctx, // NOTE: need save current stmt type avoid PL-Compile corrupt session.stmt_type auto saved_stmt_type = ctx.get_my_session()->get_stmt_type(); - OZ (get_pl_function(ctx, - *ctx.get_package_guard(), - package_id, - routine_id, - subprogram_path, - cacheobj_guard, - local_routine), - K(routine_id), K(subprogram_path)); - // restore work timeout - if (old_worker_timeout_ts != 0) { - THIS_WORKER.set_timeout_ts(old_worker_timeout_ts); - } - // if the routine comes from local, guard needn't to manage it. if (OB_FAIL(ret)) { - } else if (OB_NOT_NULL(local_routine)) { - routine = local_routine; - } else { - routine = static_cast(cacheobj_guard.get_cache_obj()); - } - CK (OB_NOT_NULL(routine)); - CK (OB_NOT_NULL(ctx.get_my_session())); - OZ (ObPLContext::check_routine_legal(*routine, in_function, - ctx.get_my_session()->is_for_trigger_package())); - OZ (check_trigger_arg(params, *routine)); - if (OB_SUCC(ret) && ctx.get_my_session()->is_pl_debug_on()) { - int tmp_ret = OB_SUCCESS; - bool need_check = true; - ObPLContext *pl_ctx = ctx.get_my_session()->get_pl_context(); - if (OB_NOT_NULL(pl_ctx)) { - ObIArray &stack = pl_ctx->get_exec_stack(); - if (stack.count() > 0) { - pl::ObPLExecState *frame = stack.at(stack.count() - 1); - // look into caller, if caller hasn't debug priv, the callee also has not - if (OB_NOT_NULL(frame) && !(frame->get_function().has_debug_priv())) { - need_check = false; + } else if (!is_valid_id(dblink_id)) { + OZ (get_pl_function(ctx, + *ctx.get_package_guard(), + package_id, + routine_id, + subprogram_path, + cacheobj_guard, + local_routine), + K(routine_id), K(subprogram_path)); + // restore work timeout + if (old_worker_timeout_ts != 0) { + THIS_WORKER.set_timeout_ts(old_worker_timeout_ts); + } + // if the routine comes from local, guard needn't to manage it. + if (OB_FAIL(ret)) { + } else if (OB_NOT_NULL(local_routine)) { + routine = local_routine; + } else { + routine = static_cast(cacheobj_guard.get_cache_obj()); + } + CK (OB_NOT_NULL(routine)); + CK (OB_NOT_NULL(ctx.get_my_session())); + OZ (ObPLContext::check_routine_legal(*routine, in_function, + ctx.get_my_session()->is_for_trigger_package())); + OZ (check_trigger_arg(params, *routine)); + if (OB_SUCC(ret) && ctx.get_my_session()->is_pl_debug_on()) { + int tmp_ret = OB_SUCCESS; + bool need_check = true; + ObPLContext *pl_ctx = ctx.get_my_session()->get_pl_context(); + if (OB_NOT_NULL(pl_ctx)) { + ObIArray &stack = pl_ctx->get_exec_stack(); + if (stack.count() > 0) { + pl::ObPLExecState *frame = stack.at(stack.count() - 1); + // look into caller, if caller hasn't debug priv, the callee also has not + if (OB_NOT_NULL(frame) && !(frame->get_function().has_debug_priv())) { + need_check = false; + } + } + } else { + } + bool is_nested_routine = OB_NOT_NULL(local_routine) && (!subprogram_path.empty()); + // routine default has not debug priv, if a routine is not a nested routine, we check it to see + // if it has debug priv, and set or clear debug flag. + if (need_check) { + if (!is_nested_routine) { + tmp_ret = ObPLContext::check_debug_priv(ctx.get_sql_ctx()->schema_guard_, + ctx.get_my_session(), routine); + } else { + // a nested routine debug priv same as caller, because a nested routine cann't be called + // from outside of this routine, to be here, we can see that the caller has debug priv + // or the need_check flag is not true; + routine->set_debug_priv(); } } - } else { } - bool is_nested_routine = OB_NOT_NULL(local_routine) && (!subprogram_path.empty()); - // routine default has not debug priv, if a routine is not a nested routine, we check it to see - // if it has debug priv, and set or clear debug flag. - if (need_check) { - if (!is_nested_routine) { - tmp_ret = ObPLContext::check_debug_priv(ctx.get_sql_ctx()->schema_guard_, - ctx.get_my_session(), routine); - } else { - // a nested routine debug priv same as caller, because a nested routine cann't be called - // from outside of this routine, to be here, we can see that the caller has debug priv - // or the need_check flag is not true; - routine->set_debug_priv(); - } + if (OB_SUCC(ret) && !ObUDTObjectType::is_object_id(package_id)) { + OZ (check_exec_priv(ctx, routine)); } } - if (OB_SUCC(ret) && !ObUDTObjectType::is_object_id(package_id)) { - OZ (check_exec_priv(ctx, routine)); - } // prepare it ... OZ (stack_ctx.init(*(ctx.get_my_session()), ctx, routine, in_function || (package_id != OB_INVALID_ID && ObTriggerInfo::is_trigger_package_id(package_id)), - &allocator)); - OZ (stack_ctx.inc_and_check_depth(package_id, routine_id, routine->is_function())); - OZ (stack_ctx.set_exec_env(*routine)); - OZ (stack_ctx.set_default_database(*routine, *(ctx.get_sql_ctx()->schema_guard_))); - OZ (stack_ctx.set_role_id_array(*routine, *(ctx.get_sql_ctx()->schema_guard_))); + &allocator, + is_valid_id(dblink_id))); + if (OB_SUCC(ret) && !is_valid_id(dblink_id)) { + OZ (stack_ctx.inc_and_check_depth(package_id, routine_id, routine->is_function())); + OZ (stack_ctx.set_exec_env(*routine)); + OZ (stack_ctx.set_default_database(*routine, *(ctx.get_sql_ctx()->schema_guard_))); + OZ (stack_ctx.set_role_id_array(*routine, *(ctx.get_sql_ctx()->schema_guard_))); + } #define UNPREPARE() \ if (stack_ctx.is_inited()) { \ - stack_ctx.reset_exec_env(ret); \ - stack_ctx.reset_default_database(ret); \ - stack_ctx.reset_role_id_array(ret); \ - stack_ctx.dec_and_check_depth(package_id, routine_id, ret); \ + if (!is_valid_id(dblink_id)) { \ + stack_ctx.reset_exec_env(ret); \ + stack_ctx.reset_default_database(ret); \ + stack_ctx.reset_role_id_array(ret); \ + stack_ctx.dec_and_check_depth(package_id, routine_id, ret); \ + } \ stack_ctx.destory(*ctx.get_my_session(), ctx, ret); \ } \ if (NULL != routine) routine->clean_debug_priv(); \ @@ -2064,20 +2097,27 @@ int ObPL::execute(ObExecContext &ctx, ctx.get_my_session()->set_stmt_type(saved_stmt_type); try { // execute it ... - OZ (execute(ctx, - allocator, - *(ctx.get_package_guard()), - *routine, - ¶ms, - ((0 == nocopy_params.count()) ? NULL : &nocopy_params), - &result, - status, - stack_ctx.is_top_stack(), - inner_call, - routine->is_function() || in_function, - false, - loc, - is_called_from_sql)); + if (OB_FAIL(ret)) { +#ifdef OB_BUILD_ORACLE_PL + } else if (is_valid_id(dblink_id)) { + OZ (ObSPIService::spi_execute_dblink(ctx, allocator, dblink_id, package_id, routine_id, params, &result)); +#endif + } else { + OZ (execute(ctx, + allocator, + *(ctx.get_package_guard()), + *routine, + ¶ms, + ((0 == nocopy_params.count()) ? NULL : &nocopy_params), + &result, + status, + stack_ctx.is_top_stack(), + inner_call, + routine->is_function() || in_function, + false, + loc, + is_called_from_sql)); + } } catch (...) { LOG_WARN("failed to execute it", K(ret), K(package_id), K(routine_id), K(subprogram_path)); UNPREPARE(); @@ -4289,7 +4329,7 @@ int ObPLINS::init_complex_obj(ObIAllocator &allocator, } else { int64_t init_size = OB_INVALID_SIZE; int64_t member_ptr = 0; - OZ (record_type->get_member(i)->get_size(*this, PL_TYPE_INIT_SIZE, init_size)); + OZ (record_type->get_member(i)->get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (record_type->get_member(i)->newx(allocator, this, member_ptr)); OX (member->set_extend(member_ptr, record_type->get_member(i)->get_type(), init_size)); } @@ -4368,7 +4408,7 @@ int ObPLINS::get_size(ObPLTypeSize type, CK (pl_type.is_composite_type()); OZ (get_user_type(pl_type.get_user_type_id(), user_type, allocator)); OV (OB_NOT_NULL(user_type), OB_ERR_UNEXPECTED, K(pl_type)); - OZ (user_type->get_size(*this, type, size)); + OZ (user_type->get_size(type, size)); return ret; } diff --git a/src/pl/ob_pl.h b/src/pl/ob_pl.h index 34ee31afaf..afd059399c 100644 --- a/src/pl/ob_pl.h +++ b/src/pl/ob_pl.h @@ -888,8 +888,12 @@ public: int is_inited() { return session_info_ != NULL; } - int init(sql::ObSQLSessionInfo &session_info, sql::ObExecContext &ctx, - ObPLFunction *routine, bool is_function_or_trigger, ObIAllocator *allocator = NULL); + int init(sql::ObSQLSessionInfo &session_info, + sql::ObExecContext &ctx, + ObPLFunction *routine, + bool is_function_or_trigger, + ObIAllocator *allocator = NULL, + const bool is_dblink = false); void destory(sql::ObSQLSessionInfo &session_info, sql::ObExecContext &ctx, int &ret); inline ObPLCursorInfo& get_cursor_info() { return cursor_info_; } @@ -1128,7 +1132,8 @@ public: bool inner_call = false, bool in_function = false, uint64_t loc = 0, - bool is_called_from_sql = false); + bool is_called_from_sql = false, + uint64_t dblink_id = OB_INVALID_ID); int check_exec_priv(sql::ObExecContext &ctx, ObPLFunction *routine); private: diff --git a/src/pl/ob_pl_code_generator.cpp b/src/pl/ob_pl_code_generator.cpp index ce6a24fbd9..90bc011da4 100644 --- a/src/pl/ob_pl_code_generator.cpp +++ b/src/pl/ob_pl_code_generator.cpp @@ -1504,7 +1504,7 @@ int ObPLCodeGenerateVisitor::visit(const ObPLReturnStmt &s) OZ (args.push_back(type_id)); OZ (generator_.get_helper().get_int64(OB_INVALID_INDEX, var_idx)); OZ (args.push_back(var_idx)); - OZ (user_type->get_size(*s.get_namespace(), PL_TYPE_INIT_SIZE, init_size)); + OZ (user_type->get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (generator_.get_helper().get_int32(init_size, init_value)); OZ (args.push_back(init_value)); OZ (generator_.get_helper().get_llvm_type(ObIntType, int_type)); @@ -1846,7 +1846,7 @@ int ObPLCodeGenerateVisitor::visit(const ObPLExecuteStmt &s) CK (OB_NOT_NULL(user_type \ = s.get_namespace()->get_type_table()->get_external_type(type_id))); \ } \ - OZ (user_type->get_size(*s.get_namespace(), PL_TYPE_ROW_SIZE, row_size)); \ + OZ (user_type->get_size(PL_TYPE_ROW_SIZE, row_size)); \ OZ (user_type->get_field_count(*s.get_namespace(), filed_cnt)); \ if (OB_SUCC(ret) \ && ObObjAccessIdx::is_package_variable(access_expr->get_access_idxs())) { \ @@ -2635,7 +2635,7 @@ int ObPLCodeGenerateVisitor::visit(const ObPLCallStmt &s) ObLLVMValue init_value; ObLLVMValue extend_value; OZ (generator_.get_helper().get_int8(pl_type->get_type(), var_type)); - OZ (pl_type->get_size(*s.get_namespace(), PL_TYPE_INIT_SIZE, init_size)); + OZ (pl_type->get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (generator_.get_helper().get_int32(init_size, init_value)); OZ (generator_.get_helper().get_llvm_type(ObIntType, int_type)); OZ (generator_.get_helper().create_ptr_to_int(ObString("cast_ptr_to_int64"), diff --git a/src/pl/ob_pl_package_state.cpp b/src/pl/ob_pl_package_state.cpp index 45589f3323..bfc0258722 100644 --- a/src/pl/ob_pl_package_state.cpp +++ b/src/pl/ob_pl_package_state.cpp @@ -272,16 +272,17 @@ int ObPLPackageState::make_pkg_var_kv_value(ObPLExecCtx &ctx, ObObj &var_val, in ret = OB_ERR_UNEXPECTED; LOG_WARN("sql session is null.", K(ret)); } else { - pl::ObPLPackageGuard package_guard(sql_session->get_effective_tenant_id()); + pl::ObPLPackageGuard *package_guard = NULL; CK (OB_NOT_NULL(sql_session->get_pl_engine())); - OZ (package_guard.init()); + OZ (ctx.exec_ctx_->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); if (OB_SUCC(ret)) { const ObPLVar *var = NULL; ObPLResolveCtx resolve_ctx(*ctx.allocator_, *sql_session, *ctx.exec_ctx_->get_sql_ctx()->schema_guard_, - package_guard, + *package_guard, *ctx.exec_ctx_->get_sql_proxy(), false /*is_ps*/); OZ (sql_session->get_pl_engine() diff --git a/src/pl/ob_pl_resolver.cpp b/src/pl/ob_pl_resolver.cpp index 5f4c2dba83..8cb9a3b25a 100644 --- a/src/pl/ob_pl_resolver.cpp +++ b/src/pl/ob_pl_resolver.cpp @@ -9340,6 +9340,17 @@ int ObPLResolver::build_raw_expr(const ParseNode *node, OZ (resolve_columns(expr, columns, unit_ast)); } OV (udf_info.count() <= 0, OB_ERR_UNEXPECTED, K(udf_info)); + if (OB_SUCC(ret) + && NULL != expr + && (T_FUN_SYS_PL_SEQ_NEXT_VALUE == expr->get_expr_type() + || T_FUN_SYS_SEQ_NEXTVAL == expr->get_expr_type())) { + ObSequenceRawExpr *seq_expr = static_cast(expr); + if (OB_INVALID_ID != seq_expr->get_dblink_id()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("dblink sequence not support in PL", K(ret)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "dblink sequence in PL"); + } + } if (OB_SUCC(ret) && op_exprs.count() > 0) { if (OB_FAIL(ObRawExprUtils::resolve_op_exprs_for_oracle_implicit_cast(expr_factory_, &resolve_ctx_.session_info_, op_exprs))) { @@ -9526,7 +9537,7 @@ int ObPLResolver::formalize_expr(ObRawExpr &expr, const ObUserDefinedType *user_type = NULL; OZ (ns.get_user_type(expr.get_udt_id(), user_type, NULL)); CK (OB_NOT_NULL(user_type)); - OZ (user_type->get_size(ns, PL_TYPE_INIT_SIZE, size)); + OZ (user_type->get_size(PL_TYPE_INIT_SIZE, size)); OX (const_cast(expr.get_result_type()).set_extend_size(size)); } for (int64_t i = 0; OB_SUCC(ret) && i < expr.get_param_count(); ++i) { @@ -10196,11 +10207,12 @@ int ObPLResolver::resolve_raw_expr(const ParseNode &node, } else if (OB_ISNULL(params.secondary_namespace_)) { HEAP_VAR(pl::ObPLFunctionAST, func_ast, *(params.allocator_)) { ObPLStmtBlock *null_block = NULL; - ObPLPackageGuard package_guard(params.session_info_->get_effective_tenant_id()); + ObPLPackageGuard dummy_pkg_guard(params.session_info_->get_effective_tenant_id()); + ObPLPackageGuard *package_guard = (NULL != params.package_guard_ ? params.package_guard_ : &dummy_pkg_guard); ObPLResolver resolver(*(params.allocator_), *(params.session_info_), *(params.schema_checker_->get_schema_guard()), - package_guard, + *package_guard, *(params.sql_proxy_), *(params.expr_factory_), NULL,/*parent ns*/ @@ -10208,7 +10220,9 @@ int ObPLResolver::resolve_raw_expr(const ParseNode &node, false, /*check mode*/ false, /*sql mode*/ params.param_list_); - OZ (package_guard.init()); + if (NULL == params.package_guard_) { + OZ (package_guard->init()); + } OZ (resolver.init(func_ast)); // build first namespace OZ (resolver.make_block(func_ast, NULL, null_block)); @@ -10753,24 +10767,30 @@ int ObPLResolver::resolve_obj_access_node(const ParseNode &node, ObSchemaGetterGuard &schema_guard, ObMySQLProxy &sql_proxy, ObIArray &obj_access_idents, - ObIArray& access_idxs) + ObIArray& access_idxs, + ObPLPackageGuard *package_guard) { int ret = OB_SUCCESS; - pl::ObPLPackageGuard package_guard(session_info.get_effective_tenant_id()); - ObArenaAllocator allocator(ObModIds::OB_MODULE_PAGE_ALLOCATOR, - OB_MALLOC_NORMAL_BLOCK_SIZE, - MTL_ID()); - // fake resolve_ctx, we only use session_info, schema_guard - ObPLResolveCtx resolve_ctx( - allocator, session_info, schema_guard, package_guard, *(GCTX.sql_proxy_), false); - ObPLExternalNS external_ns(resolve_ctx, NULL); - CK (T_SP_OBJ_ACCESS_REF == node.type_); - OZ (package_guard.init()); - OZ (ObPLResolver::resolve_obj_access_idents(node, expr_factory, obj_access_idents, session_info)); - for (int64_t i = 0; OB_SUCC(ret) && i < obj_access_idents.count(); ++i) { - OZ (ObPLResolver::resolve_access_ident(obj_access_idents.at(i), - external_ns, - access_idxs)); + pl::ObPLPackageGuard dummy_pkg_guard(session_info.get_effective_tenant_id()); + if (NULL == package_guard) { + package_guard = &dummy_pkg_guard; + OZ (package_guard->init()); + } + if (OB_SUCC(ret)) { + ObArenaAllocator allocator(ObModIds::OB_MODULE_PAGE_ALLOCATOR, + OB_MALLOC_NORMAL_BLOCK_SIZE, + MTL_ID()); + // fake resolve_ctx, we only use session_info, schema_guard + ObPLResolveCtx resolve_ctx( + allocator, session_info, schema_guard, *package_guard, *(GCTX.sql_proxy_), false); + ObPLExternalNS external_ns(resolve_ctx, NULL); + CK (T_SP_OBJ_ACCESS_REF == node.type_); + OZ (ObPLResolver::resolve_obj_access_idents(node, expr_factory, obj_access_idents, session_info)); + for (int64_t i = 0; OB_SUCC(ret) && i < obj_access_idents.count(); ++i) { + OZ (ObPLResolver::resolve_access_ident(obj_access_idents.at(i), + external_ns, + access_idxs)); + } } if (OB_FAIL(ret)) { record_error_line(const_cast(&node), session_info); @@ -11189,6 +11209,95 @@ int ObPLResolver::resolve_dblink_type(const ObString &dblink_name, return ret; } +int ObPLResolver::resolve_dblink_udf(sql::ObQualifiedName &q_name, + ObRawExprFactory &expr_factory, + ObRawExpr *&expr, + ObPLCompileUnitAST &unit_ast) +{ + int ret = OB_SUCCESS; + ObString db_name, pkg_name, routine_name; + const ObIRoutineInfo *routine_info = NULL; + ObSEArray expr_params; + int64_t cnt = q_name.access_idents_.count(); + OV (OB_LIKELY(cnt >= 1 && cnt <= 3), OB_ERR_UNEXPECTED, K(cnt), K(q_name)); + if (OB_SUCC(ret)) { + routine_name = q_name.access_idents_.at(cnt - 1).access_name_; + if (cnt >= 2) { + pkg_name = q_name.access_idents_.at(cnt - 2).access_name_; + } + if (3 == cnt) { + db_name = q_name.access_idents_.at(cnt - 3).access_name_; + } + } + OZ (ObRawExprUtils::rebuild_expr_params(q_name.access_idents_.at(cnt - 1).udf_info_, &expr_factory, expr_params)); + OZ (ObPLResolver::resolve_dblink_routine(resolve_ctx_, + q_name.dblink_name_, + db_name, + pkg_name, + routine_name, + expr_params, + routine_info)); + CK (OB_NOT_NULL(routine_info)); + if (OB_SUCC(ret)) { + ObUDFInfo &udf_info = q_name.access_idents_.at(cnt - 1).udf_info_; + const ObRoutineInfo *sch_routine_info = static_cast(routine_info); + bool is_pkg_func = (OB_INVALID_ID != sch_routine_info->get_package_id()); + uint64_t routine_id = is_pkg_func ? sch_routine_info->get_subprogram_id() : sch_routine_info->get_routine_id(); + ObSchemaChecker schema_checker; + ObString dblink_name; + if (sch_routine_info->is_procedure()) { + ObSqlString object_name; + construct_name(db_name, pkg_name, routine_name, object_name); + ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS; + LOG_WARN("routine is a procedure", K(ret), K(object_name.string())); + LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, object_name.string().length(), object_name.string().ptr()); + } else if (is_valid_id(sch_routine_info->get_dblink_id())) { + const ObDbLinkSchema *dblink_schema = NULL; + OZ (resolve_ctx_.schema_guard_.get_dblink_schema(resolve_ctx_.session_info_.get_effective_tenant_id(), + sch_routine_info->get_dblink_id(), + dblink_schema)); + CK (OB_NOT_NULL(dblink_schema)); + OX (dblink_name = dblink_schema->get_dblink_name()); + } + OZ (schema_checker.init(resolve_ctx_.schema_guard_, resolve_ctx_.session_info_.get_sessid())); + OZ (ObRawExprUtils::resolve_udf_common_info(db_name, + sch_routine_info->is_dblink_routine() ? + sch_routine_info->get_dblink_pkg_name() : pkg_name, + routine_id, + sch_routine_info->get_package_id(), + ObArray(), + common::OB_INVALID_VERSION, + common::OB_INVALID_VERSION, + false, + true, + false, + false, + common::OB_INVALID_ID, + udf_info, + sch_routine_info->get_dblink_id(), + dblink_name)); + OZ (ObRawExprUtils::resolve_udf_param_types(sch_routine_info, + resolve_ctx_.schema_guard_, + resolve_ctx_.session_info_, + resolve_ctx_.allocator_, + resolve_ctx_.sql_proxy_, + udf_info, + resolve_ctx_.package_guard_.dblink_guard_), udf_info); + OZ (ObRawExprUtils::resolve_udf_param_exprs(sch_routine_info, + current_block_->get_namespace(), + schema_checker, + resolve_ctx_.session_info_, + resolve_ctx_.allocator_, + resolve_ctx_.is_prepare_protocol_, + expr_factory, + resolve_ctx_.sql_proxy_, + resolve_ctx_.extern_param_info_, + udf_info), udf_info); + // 需要做类似 ObPLResolver::resolve_udf_info 最后的 params_type 设置吗? + } + OX (unit_ast.set_can_cached(false)); + return ret; +} int ObPLResolver::resolve_qualified_identifier(ObQualifiedName &q_name, ObIArray &columns, @@ -11416,7 +11525,7 @@ int ObPLResolver::resolve_record_construct(const ObQualifiedName &q_name, } OZ (expr_factory_.create_raw_expr(T_FUN_PL_OBJECT_CONSTRUCT, object_expr)); CK (OB_NOT_NULL(object_expr)); - OZ (user_type->get_size(current_block_->get_namespace(), pl::PL_TYPE_ROW_SIZE, rowsize)); + OZ (user_type->get_size(pl::PL_TYPE_ROW_SIZE, rowsize)); OX (object_expr->set_rowsize(rowsize)); OX (res_type.set_type(ObExtendType)); OX (res_type.set_extend_type(PL_RECORD_TYPE)); @@ -11647,11 +11756,7 @@ int ObPLResolver::resolve_qualified_name(ObQualifiedName &q_name, int ret = OB_SUCCESS; SET_LOG_CHECK_MODE(); - if (!q_name.dblink_name_.empty()) { - ret = OB_NOT_SUPPORTED; - LOG_WARN("dblink sequence and udf not support in PL", K(ret), K(q_name)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "dblink sequence and udf in PL"); - } + OZ (replace_udf_param_expr(q_name, columns, real_exprs)); if (OB_FAIL(ret)) { } else if (q_name.is_sys_func()) { @@ -11689,18 +11794,22 @@ int ObPLResolver::resolve_qualified_name(ObQualifiedName &q_name, } else { if (OB_SUCC(ret)) { if (q_name.is_pl_udf()) { - ObSEArray access_idxs; - OZ (resolve_name(q_name, current_block_->get_namespace(), expr_factory_, &resolve_ctx_.session_info_, access_idxs, unit_ast)); - if (OB_FAIL(ret)) { - } else if (access_idxs.at(access_idxs.count() - 1).is_udf_type()) { - OX (expr = reinterpret_cast(access_idxs.at(access_idxs.count() - 1).get_sysfunc_)); + if (q_name.dblink_name_.empty()) { + ObSEArray access_idxs; + OZ (resolve_name(q_name, current_block_->get_namespace(), expr_factory_, &resolve_ctx_.session_info_, access_idxs, unit_ast)); + if (OB_FAIL(ret)) { + } else if (access_idxs.at(access_idxs.count() - 1).is_udf_type()) { + OX (expr = reinterpret_cast(access_idxs.at(access_idxs.count() - 1).get_sysfunc_)); + } else { + OZ (make_var_from_access(access_idxs, + expr_factory_, + &(resolve_ctx_.session_info_), + &(resolve_ctx_.schema_guard_), + current_block_->get_namespace(), + expr)); + } } else { - OZ (make_var_from_access(access_idxs, - expr_factory_, - &(resolve_ctx_.session_info_), - &(resolve_ctx_.schema_guard_), - current_block_->get_namespace(), - expr)); + OZ (resolve_dblink_udf(q_name, expr_factory_, expr, unit_ast)); } CK (OB_NOT_NULL(expr)); } else { // 如果是udf return access,需要当做var解析 @@ -11876,16 +11985,27 @@ int ObPLResolver::add_udt_self_argument(const ObIRoutineInfo *routine_info, #endif int ObPLResolver::resolve_udf_info( - ObUDFInfo &udf_info, ObIArray &access_idxs, ObPLCompileUnitAST &func) + ObUDFInfo &udf_info, ObIArray &access_idxs, ObPLCompileUnitAST &func, + const ObIRoutineInfo *routine_info) { int ret = OB_SUCCESS; ObString db_name = udf_info.udf_database_; ObString package_name = udf_info.udf_package_; ObString udf_name = udf_info.udf_name_; ObSchemaChecker schema_checker; - const ObIRoutineInfo *routine_info = NULL; ObProcType routine_type = STANDALONE_FUNCTION; ObSEArray expr_params; + ObString dblink_name; + +#define GET_DBLINK_NAME(real_routine_info) \ + if (OB_SUCC(ret) && is_valid_id(real_routine_info->get_dblink_id())) { \ + const ObDbLinkSchema *dblink_schema = NULL; \ + OZ (resolve_ctx_.schema_guard_.get_dblink_schema(resolve_ctx_.session_info_.get_effective_tenant_id(), \ + real_routine_info->get_dblink_id(), \ + dblink_schema)); \ + CK (OB_NOT_NULL(dblink_schema)); \ + OX (dblink_name = dblink_schema->get_dblink_name()); \ + } CK (OB_NOT_NULL(udf_info.ref_expr_)); CK (OB_NOT_NULL(current_block_)); @@ -11895,13 +12015,15 @@ int ObPLResolver::resolve_udf_info( { ObPLMockSelfArg self(access_idxs, expr_params, expr_factory_, resolve_ctx_.session_info_);; OZ (self.mock()); - OZ (current_block_->get_namespace().resolve_routine(resolve_ctx_, - udf_info.udf_database_, - udf_info.udf_package_, - udf_info.udf_name_, - expr_params, - routine_type, - routine_info), K(udf_info)); + if (OB_ISNULL(routine_info) || (OB_NOT_NULL(routine_info) && !is_valid_id(routine_info->get_dblink_id()))) { + OZ (current_block_->get_namespace().resolve_routine(resolve_ctx_, + udf_info.udf_database_, + udf_info.udf_package_, + udf_info.udf_name_, + expr_params, + routine_type, + routine_info), K(udf_info)); + } } if (OB_SUCC(ret) && OB_NOT_NULL(routine_info)) { @@ -12019,6 +12141,7 @@ int ObPLResolver::resolve_udf_info( package_routine_info->get_pkg_id(), cur_pkg_version)); } + GET_DBLINK_NAME(package_routine_info); OZ (ObRawExprUtils::resolve_udf_common_info(db_name, package_name, package_routine_info->get_id(), @@ -12031,13 +12154,16 @@ int ObPLResolver::resolve_udf_info( is_package_body_udf, false, common::OB_INVALID_ID, - udf_info)); + udf_info, + package_routine_info->get_dblink_id(), + dblink_name)); OZ (ObRawExprUtils::resolve_udf_param_types(package_routine_info, resolve_ctx_.schema_guard_, resolve_ctx_.session_info_, resolve_ctx_.allocator_, resolve_ctx_.sql_proxy_, - udf_info), udf_info); + udf_info, + resolve_ctx_.package_guard_.dblink_guard_), udf_info); OZ (ObRawExprUtils::resolve_udf_param_exprs(package_routine_info, current_block_->get_namespace(), schema_checker, @@ -12067,7 +12193,9 @@ int ObPLResolver::resolve_udf_info( OX (udf_info.is_udt_udf_ = schema_routine_info->is_udt_routine()); - if (OB_SUCC(ret) && routine_id == schema_routine_info->get_subprogram_id()) { + if (OB_SUCC(ret) + && routine_id == schema_routine_info->get_subprogram_id() + && OB_INVALID_ID == schema_routine_info->get_dblink_id()) { if (!udf_info.is_udt_udf_) { OZ (resolve_ctx_.schema_guard_.get_package_info( schema_routine_info->get_tenant_id(), schema_routine_info->get_package_id(), package_info)); @@ -12094,8 +12222,10 @@ int ObPLResolver::resolve_udf_info( } #endif } + GET_DBLINK_NAME(schema_routine_info); OZ (ObRawExprUtils::resolve_udf_common_info(db_name, - package_name, + schema_routine_info->is_dblink_routine() ? + schema_routine_info->get_dblink_pkg_name() : package_name, routine_id, schema_routine_info->get_package_id(), ObArray(), @@ -12110,13 +12240,16 @@ int ObPLResolver::resolve_udf_info( false, /*is_pkg_body_udf*/ schema_routine_info->is_aggregate(), schema_routine_info->get_type_id(), - udf_info)); + udf_info, + schema_routine_info->get_dblink_id(), + dblink_name)); OZ (ObRawExprUtils::resolve_udf_param_types(schema_routine_info, resolve_ctx_.schema_guard_, resolve_ctx_.session_info_, resolve_ctx_.allocator_, resolve_ctx_.sql_proxy_, - udf_info), udf_info); + udf_info, + resolve_ctx_.package_guard_.dblink_guard_), udf_info); OZ (ObRawExprUtils::resolve_udf_param_exprs(schema_routine_info, current_block_->get_namespace(), schema_checker, @@ -12145,13 +12278,16 @@ int ObPLResolver::resolve_udf_info( == current_block_->get_namespace().get_block_type(), false, common::OB_INVALID_ID, - udf_info)); + udf_info, + common::OB_INVALID_ID, + "")); OZ (ObRawExprUtils::resolve_udf_param_types(sub_routine_info, resolve_ctx_.schema_guard_, resolve_ctx_.session_info_, resolve_ctx_.allocator_, resolve_ctx_.sql_proxy_, - udf_info), udf_info); + udf_info, + resolve_ctx_.package_guard_.dblink_guard_), udf_info); OZ (ObRawExprUtils::resolve_udf_param_exprs(sub_routine_info, current_block_->get_namespace(), schema_checker, @@ -13681,6 +13817,16 @@ int ObPLResolver::resolve_routine(ObObjAccessIdent &access_ident, routine_name, expr_params, routine_info)); OX (func.set_can_cached(false)); } + if (OB_SUCC(ret) + && NULL != routine_info + && NULL != routine_info->get_ret_info() + && OB_INVALID_ID != routine_info->get_dblink_id()) { + CK (access_ident.is_pl_udf()); + CK (OB_NOT_NULL(access_ident.udf_info_.ref_expr_)); + if (OB_SUCC(ret)) { + access_ident.udf_info_.ref_expr_->set_func_name(routine_info->get_routine_name()); + } + } } if (OB_FAIL(ret) @@ -13792,7 +13938,9 @@ int ObPLResolver::resolve_function(ObObjAccessIdent &access_ident, resolve_ctx_.session_info_, resolve_ctx_.allocator_, resolve_ctx_.sql_proxy_, - return_type)); + return_type, + NULL, + &resolve_ctx_.package_guard_.dblink_guard_)); } else { OX (return_type = routine_info->get_ret_info()->get_pl_data_type()); } @@ -13803,7 +13951,7 @@ int ObPLResolver::resolve_function(ObObjAccessIdent &access_ident, access_ident.udf_info_.udf_database_, access_ident.udf_info_.udf_package_, access_ident.udf_info_.udf_name_)); - OZ (resolve_udf_info(access_ident.udf_info_, access_idxs, func), K(access_ident)); + OZ (resolve_udf_info(access_ident.udf_info_, access_idxs, func, routine_info), K(access_ident)); if (OB_SUCC(ret) && access_ident.udf_info_.is_new_keyword_used_ && !access_ident.udf_info_.is_udf_udt_cons()) { @@ -15149,15 +15297,17 @@ int ObPLResolver::resolve_sequence_object(const ObQualifiedName &q_name, LOG_WARN("init schemachecker failed."); } else { // check if sequence is created. will also check synonym + uint64_t dblink_id = OB_INVALID_ID; if (OB_FAIL(ob_sequence_ns_checker_.check_sequence_namespace(q_name, syn_checker, &resolve_ctx_.session_info_, &sc, - seq_id))) { + seq_id, + &dblink_id))) { LOG_WARN_IGNORE_COL_NOTFOUND(ret, "check basic column namespace failed", K(ret), K(q_name)); } else if(OB_FAIL(build_seq_value_expr(real_ref_expr, q_name, seq_id))) { LOG_WARN("failed to resolve seq.", K(ret)); - } else { + } else if (OB_INVALID_ID == dblink_id) { int64_t schema_version = OB_INVALID_VERSION; ObSchemaObjVersion obj_version; const uint64_t tenant_id = resolve_ctx_.session_info_.get_effective_tenant_id(); @@ -15170,6 +15320,11 @@ int ObPLResolver::resolve_sequence_object(const ObQualifiedName &q_name, OX (obj_version.object_type_ = DEPENDENCY_SEQUENCE); OX (obj_version.version_ = schema_version); OZ (unit_ast.add_dependency_object(obj_version)); + } else { + ObSequenceRawExpr *seq_expr = static_cast(real_ref_expr); + seq_expr->set_dblink_name(q_name.dblink_name_); + seq_expr->set_dblink_id(dblink_id); + unit_ast.set_can_cached(false); } if (OB_SUCC(ret)) { unit_ast.set_has_sequence(); diff --git a/src/pl/ob_pl_resolver.h b/src/pl/ob_pl_resolver.h index bd409c4667..2c3837dbcd 100644 --- a/src/pl/ob_pl_resolver.h +++ b/src/pl/ob_pl_resolver.h @@ -373,7 +373,8 @@ public: share::schema::ObSchemaGetterGuard &schema_guard, ObMySQLProxy &sql_proxy, ObIArray &obj_access_idents, - ObIArray& access_idxs); + ObIArray& access_idxs, + ObPLPackageGuard *package_guard); static int resolve_cparam_list_simple(const ParseNode &node, ObRawExprFactory &expr_factory, @@ -576,7 +577,7 @@ public: static int adjust_routine_param_type(ObPLDataType &type); int resolve_udf_info( - sql::ObUDFInfo &udf_info, ObIArray &access_idxs, ObPLCompileUnitAST &func); + sql::ObUDFInfo &udf_info, ObIArray &access_idxs, ObPLCompileUnitAST &func, const ObIRoutineInfo *routine_info = NULL); int construct_name(ObString &database_name, ObString &package_name, ObString &routine_name, ObSqlString &object_name); static int resolve_dblink_routine(ObPLResolveCtx &resolve_ctx, @@ -601,6 +602,10 @@ public: const ObString &udt_name, ObPLCompileUnitAST &func, ObPLDataType &pl_type); + int resolve_dblink_udf(sql::ObQualifiedName &q_name, + ObRawExprFactory &expr_factory, + ObRawExpr *&expr, + ObPLCompileUnitAST &unit_ast); private: int resolve_declare_var(const ObStmtNodeTree *parse_tree, ObPLDeclareVarStmt *stmt, ObPLFunctionAST &func_ast); int resolve_declare_var(const ObStmtNodeTree *parse_tree, ObPLPackageAST &package_ast); diff --git a/src/pl/ob_pl_type.cpp b/src/pl/ob_pl_type.cpp index a19179ddda..a6eaa50e70 100644 --- a/src/pl/ob_pl_type.cpp +++ b/src/pl/ob_pl_type.cpp @@ -26,6 +26,7 @@ #include "sql/resolver/expr/ob_raw_expr_copier.h" #include "pl/ob_pl_user_type.h" #include "dblink/ob_pl_dblink_guard.h" +#include "sql/resolver/ob_stmt_resolver.h" namespace oceanbase { using namespace common; @@ -763,9 +764,8 @@ int ObPLDataType::newx(common::ObIAllocator &allocator, const ObPLINS *ns, int64 return ret; } -int ObPLDataType::get_size(const ObPLINS& ns, ObPLTypeSize type, int64_t &size) const +int ObPLDataType::get_size(ObPLTypeSize type, int64_t &size) const { - UNUSED(ns); UNUSED(type); int ret = OB_SUCCESS; // if (is_obj_type()) { @@ -1151,7 +1151,7 @@ int ObPLDataType::deserialize(ObSchemaGetterGuard &schema_guard, uint16_t flags; ObScale num_decimals; ObObj param; - if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get base type init size failed", K(ret)); } else if (OB_ISNULL(dst) || (dst_len - dst_pos < init_size)) { ret = OB_SIZE_OVERFLOW; diff --git a/src/pl/ob_pl_type.h b/src/pl/ob_pl_type.h index e6f8e3546f..d9a1e45624 100644 --- a/src/pl/ob_pl_type.h +++ b/src/pl/ob_pl_type.h @@ -509,7 +509,7 @@ public: jit::ObLLVMValue &value, const pl::ObPLStmt *stmt = NULL) const; virtual int newx(common::ObIAllocator &allocator, const ObPLINS *ns, int64_t &ptr) const; - virtual int get_size(const ObPLINS& ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int init_session_var(const ObPLResolveCtx &resolve_ctx, common::ObIAllocator &obj_allocator, sql::ObExecContext &exec_ctx, diff --git a/src/pl/ob_pl_user_type.cpp b/src/pl/ob_pl_user_type.cpp index 2cfd0ad6b1..98d577f96a 100644 --- a/src/pl/ob_pl_user_type.cpp +++ b/src/pl/ob_pl_user_type.cpp @@ -74,9 +74,9 @@ int ObUserDefinedType::generate_copy( } int ObUserDefinedType::get_size( - const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const + ObPLTypeSize type, int64_t &size) const { - UNUSEDx(ns, type, size); + UNUSEDx(type, size); LOG_WARN_RET(OB_NOT_SUPPORTED, "Call virtual func of ObUserDefinedType! May forgot implement in SubClass", K(this)); return OB_NOT_SUPPORTED; } @@ -690,10 +690,10 @@ int ObUserDefinedSubType::newx(common::ObIAllocator &allocator, return ret; } -int ObUserDefinedSubType::get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const +int ObUserDefinedSubType::get_size(ObPLTypeSize type, int64_t &size) const { int ret = OB_SUCCESS; - OZ (base_type_.get_size(ns, type, size)); + OZ (base_type_.get_size(type, size)); return ret; } @@ -782,9 +782,9 @@ int ObRefCursorType::newx(common::ObIAllocator &allocator, const ObPLINS *ns, in return ret; } -int ObRefCursorType::get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const +int ObRefCursorType::get_size(ObPLTypeSize type, int64_t &size) const { - UNUSEDx(ns, type, size); + UNUSEDx(type, size); size = sizeof(ObPLCursorInfo) + 8; return OB_SUCCESS; } @@ -804,7 +804,7 @@ int ObRefCursorType::init_obj(ObSchemaGetterGuard &schema_guard, MEMSET(data, 0, init_size); new(data) ObPLCursorInfo(&allocator); obj.set_ext(reinterpret_cast(data)); - } else if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -828,7 +828,7 @@ int ObRefCursorType::init_session_var(const ObPLResolveCtx &resolve_ctx, int ret = OB_SUCCESS; char *data = NULL; int64_t init_size = 0; - if (OB_FAIL(get_size(resolve_ctx, PL_TYPE_INIT_SIZE, init_size))) { + if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(obj_allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -1203,7 +1203,7 @@ int ObRecordType::newx(common::ObIAllocator &allocator, const ObPLINS *ns, int64 } else { int64_t init_size = OB_INVALID_SIZE; int64_t member_ptr = 0; - OZ (get_member(i)->get_size(*ns, PL_TYPE_INIT_SIZE, init_size)); + OZ (get_member(i)->get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (get_member(i)->newx(allocator, ns, member_ptr)); OX (member->set_extend(member_ptr, get_member(i)->get_type(), init_size)); } @@ -1348,7 +1348,7 @@ int ObRecordType::generate_default_value(ObPLCodeGenerator &generator, OZ (generator.set_current(null_branch)); OZ (SMART_CALL(member->member_type_.generate_new(generator, ns, extend_value, stmt))); OZ (generator.get_helper().get_int8(member->member_type_.get_type(), type_value)); - OZ (member->member_type_.get_size(ns, PL_TYPE_INIT_SIZE, init_size)); + OZ (member->member_type_.get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (generator.get_helper().get_int32(init_size, init_value)); OZ (generator.generate_set_extend(ptr_elem, type_value, init_value, extend_value)); OZ (generator.generate_null(ObIntType, allocator)); @@ -1367,7 +1367,7 @@ int ObRecordType::generate_default_value(ObPLCodeGenerator &generator, int64_t init_size = OB_INVALID_SIZE; OZ (SMART_CALL(member->member_type_.generate_new(generator, ns, extend_value, stmt))); OZ (generator.get_helper().get_int8(member->member_type_.get_type(), type_value)); - OZ (member->member_type_.get_size(ns, PL_TYPE_INIT_SIZE, init_size)); + OZ (member->member_type_.get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (generator.get_helper().get_int32(init_size, init_value)); OZ (generator.generate_set_extend(ptr_elem, type_value, init_value, extend_value)); } @@ -1377,14 +1377,14 @@ int ObRecordType::generate_default_value(ObPLCodeGenerator &generator, return ret; } -int ObRecordType::get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const +int ObRecordType::get_size(ObPLTypeSize type, int64_t &size) const { int ret = OB_SUCCESS; size += get_data_offset(get_record_member_count()); for (int64_t i = 0; OB_SUCC(ret) && i < get_record_member_count(); ++i) { const ObPLDataType *elem_type = get_record_member_type(i); CK (OB_NOT_NULL(elem_type)); - OZ (elem_type->get_size(ns, type, size)); + OZ (elem_type->get_size(type, size)); } return ret; } @@ -1411,7 +1411,7 @@ int ObRecordType::init_session_var(const ObPLResolveCtx &resolve_ctx, } if (OB_FAIL(ret) || obj.is_pl_extend()) { // do nothing ... - } else if (OB_FAIL(get_size(resolve_ctx, PL_TYPE_INIT_SIZE, init_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(obj_allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -1452,7 +1452,7 @@ int ObRecordType::init_session_var(const ObPLResolveCtx &resolve_ctx, } else { int64_t init_size = OB_INVALID_SIZE; int64_t member_ptr = 0; - OZ (get_member(i)->get_size(resolve_ctx, PL_TYPE_INIT_SIZE, init_size)); + OZ (get_member(i)->get_size(PL_TYPE_INIT_SIZE, init_size)); OZ (get_member(i)->newx(obj_allocator, &resolve_ctx, member_ptr)); OX (member->set_extend(member_ptr, get_member(i)->get_type(), init_size)); } @@ -1480,7 +1480,7 @@ int ObRecordType::free_session_var(const ObPLResolveCtx &resolve_ctx, } else { if (OB_FAIL(type->free_data(resolve_ctx, obj_allocator, static_cast(data)+data_pos))) { LOG_WARN("failed to get element serialize size", K(*this), K(ret)); - } else if (OB_FAIL(type->get_size(resolve_ctx, PL_TYPE_INIT_SIZE, element_init_size))) { + } else if (OB_FAIL(type->get_size(PL_TYPE_INIT_SIZE, element_init_size))) { LOG_WARN("get record element init size failed", K(ret)); } else { data_pos += element_init_size; @@ -1513,7 +1513,7 @@ int ObRecordType::free_data(const ObPLResolveCtx &resolve_ctx, } else { if (OB_FAIL(type->free_data(resolve_ctx, data_allocator, static_cast(data)+data_pos))) { LOG_WARN("failed to get element serialize size", K(*this), K(ret)); - } else if (OB_FAIL(type->get_size(resolve_ctx, PL_TYPE_INIT_SIZE, element_init_size))) { + } else if (OB_FAIL(type->get_size(PL_TYPE_INIT_SIZE, element_init_size))) { LOG_WARN("get record element init size failed", K(ret)); } else { data_pos += element_init_size; @@ -1629,7 +1629,7 @@ int ObRecordType::init_obj(ObSchemaGetterGuard &schema_guard, int ret = OB_SUCCESS; char *data = NULL; init_size = 0; - if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -1724,7 +1724,7 @@ int ObRecordType::deserialize(ObSchemaGetterGuard &schema_guard, if (OB_ISNULL(record)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("record is null", K(ret), KP(dst), KP(record)); - } else if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("failed to get record type init size", K(ret)); } else if (OB_ISNULL(dst) || (dst_len - dst_pos < init_size)) { ret = OB_DESERIALIZE_ERROR; @@ -1833,14 +1833,14 @@ int ObRecordType::convert(ObPLResolveCtx &ctx, ObObj *&src, ObObj *&dst) const #ifdef OB_BUILD_ORACLE_PL //---------- for ObOpaqueType ---------- -int ObOpaqueType::get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const +int ObOpaqueType::get_size(ObPLTypeSize type, int64_t &size) const { int ret = OB_SUCCESS; if (PL_TYPE_INIT_SIZE == type) { ObPLOpaque opaque; size += opaque.get_init_size(); } else { - OZ (ObUserDefinedType::get_size(ns, type, size)); + OZ (ObUserDefinedType::get_size(type, size)); } return ret; } @@ -1889,7 +1889,7 @@ int ObOpaqueType::init_session_var(const ObPLResolveCtx &resolve_ctx, } if (OB_FAIL(ret) || obj.is_pl_extend()) { // do nothing ... - } else if (OB_FAIL(get_size(resolve_ctx, PL_TYPE_INIT_SIZE, init_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(obj_allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -2075,11 +2075,11 @@ int ObCollectionType::get_init_size(int64_t &size) const return ret; } -int ObCollectionType::get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const +int ObCollectionType::get_size(ObPLTypeSize type, int64_t &size) const { int ret = OB_SUCCESS; if (PL_TYPE_ROW_SIZE == type) { - OZ (get_element_type().get_size(ns, type, size)); + OZ (get_element_type().get_size(type, size)); } else if (PL_TYPE_INIT_SIZE == type) { OZ (get_init_size(size)); } else { @@ -2130,7 +2130,7 @@ int ObCollectionType::set_row_size(ObPLCodeGenerator &generator, const ObPLINS & int64_t rowsize = 0; ObLLVMValue p_rowsize; OZ (generator.extract_rowsize_ptr_from_collection(collection, p_rowsize)); - OZ (get_size(ns, PL_TYPE_ROW_SIZE, rowsize)); + OZ (get_size(PL_TYPE_ROW_SIZE, rowsize)); OZ (generator.get_helper().create_istore(rowsize, p_rowsize)); return ret; } @@ -2159,9 +2159,9 @@ int ObCollectionType::init_session_var(const ObPLResolveCtx &resolve_ctx, } if (OB_FAIL(ret) || obj.is_pl_extend()) { // do nothing ... - } else if (OB_FAIL(get_size(resolve_ctx, PL_TYPE_INIT_SIZE, init_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); - } else if (OB_FAIL(get_size(resolve_ctx, PL_TYPE_ROW_SIZE, row_size))) { + } else if (OB_FAIL(get_size(PL_TYPE_ROW_SIZE, row_size))) { LOG_WARN("get row size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(obj_allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -2221,7 +2221,7 @@ int ObCollectionType::free_session_var(const ObPLResolveCtx &resolve_ctx, if (!OB_ISNULL(data)) { ObPLNestedTable *table = reinterpret_cast(data); int64_t element_init_size = 0; - if (OB_FAIL(element_type_.get_size(resolve_ctx, PL_TYPE_INIT_SIZE, element_init_size))) { + if (OB_FAIL(element_type_.get_size(PL_TYPE_INIT_SIZE, element_init_size))) { LOG_WARN("get table element type init size failed", K(ret)); } else { char *free_ptr = NULL; @@ -2259,7 +2259,7 @@ int ObCollectionType::free_data(const ObPLResolveCtx &resolve_ctx, if (!OB_ISNULL(data)) { ObPLNestedTable *table = reinterpret_cast(data); int64_t element_init_size = 0; - if (OB_FAIL(element_type_.get_size(resolve_ctx, PL_TYPE_INIT_SIZE, element_init_size))) { + if (OB_FAIL(element_type_.get_size(PL_TYPE_INIT_SIZE, element_init_size))) { LOG_WARN("get table element type init size failed", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < table->get_count() ;i++) { @@ -2485,7 +2485,7 @@ int ObCollectionType::init_obj(ObSchemaGetterGuard &schema_guard, int ret = OB_SUCCESS; char *data = NULL; init_size = 0; - if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get init size failed", K(ret)); } else if (OB_ISNULL(data = static_cast(allocator.alloc(init_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -2592,12 +2592,12 @@ int ObCollectionType::deserialize(ObSchemaGetterGuard &schema_guard, int64_t element_init_size = 0; int64_t field_cnt = OB_INVALID_COUNT; - if (OB_FAIL(get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, init_size))) { + if (OB_FAIL(get_size(PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("get table type init size failed", K(ret), KPC(this)); } else if (OB_ISNULL(dst) || (dst_len - dst_pos) < init_size) { ret = OB_DESERIALIZE_ERROR; LOG_WARN("data deserialize failed", K(ret), K(dst), K(init_size), K(dst_len), K(dst_pos)); - } else if (OB_FAIL(element_type_.get_size(ObPLUDTNS(schema_guard), PL_TYPE_INIT_SIZE, element_init_size))) { + } else if (OB_FAIL(element_type_.get_size(PL_TYPE_INIT_SIZE, element_init_size))) { LOG_WARN("get element init size failed", K(ret), KPC(this), K(init_size)); } else if (OB_FAIL(element_type_.get_field_count(ObPLUDTNS(schema_guard), field_cnt))) { LOG_WARN("get field count failed", K(ret)); @@ -2742,7 +2742,7 @@ int ObCollectionType::convert(ObPLResolveCtx &ctx, ObObj *&src, ObObj *&dst) con CK (OB_LIKELY(dst->is_ext())); CK (OB_NOT_NULL(src_table = reinterpret_cast(src->get_ext()))); CK (OB_NOT_NULL(dst_table = reinterpret_cast(dst->get_ext()))); - OZ (element_type_.get_size(ctx, PL_TYPE_INIT_SIZE, element_init_size)); + OZ (element_type_.get_size(PL_TYPE_INIT_SIZE, element_init_size)); if (OB_SUCC(ret) && OB_ISNULL(collection_allocator diff --git a/src/pl/ob_pl_user_type.h b/src/pl/ob_pl_user_type.h index 5939d461d0..bdd9f3c584 100644 --- a/src/pl/ob_pl_user_type.h +++ b/src/pl/ob_pl_user_type.h @@ -88,7 +88,7 @@ public: const ObPLINS *ns, int64_t &ptr) const; - virtual int get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int init_session_var(const ObPLResolveCtx &resolve_ctx, common::ObIAllocator &obj_allocator, sql::ObExecContext &exec_ctx, @@ -197,7 +197,7 @@ public: virtual int get_all_depended_user_type(const ObPLResolveCtx &resolve_ctx, const ObPLBlockNS ¤t_ns) const; - virtual int get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int serialize(share::schema::ObSchemaGetterGuard &schema_guard, const common::ObTimeZoneInfo *tz_info, obmysql::MYSQL_PROTOCOL_TYPE type, char *&src, char *dst, const int64_t dst_len, int64_t &dst_pos) const; @@ -261,8 +261,7 @@ public: public: int deep_copy(common::ObIAllocator &alloc, const ObRefCursorType &other); - virtual int get_size( - const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int init_obj( share::schema::ObSchemaGetterGuard &schema_guard, @@ -415,7 +414,7 @@ public: const ObPLINS *ns, int64_t &ptr) const; - virtual int get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int init_session_var(const ObPLResolveCtx &resolve_ctx, common::ObIAllocator &obj_allocator, @@ -492,7 +491,7 @@ public: virtual int64_t get_member_count() const { return 0; } virtual const ObPLDataType *get_member(int64_t i) const { UNUSED(i); return NULL; } - virtual int get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int get_all_depended_user_type( const ObPLResolveCtx &resolve_ctx, const ObPLBlockNS ¤t_ns) const @@ -563,7 +562,7 @@ public: const ObPLINS *ns, int64_t &ptr) const; - virtual int get_size(const ObPLINS &ns, ObPLTypeSize type, int64_t &size) const; + virtual int get_size(ObPLTypeSize type, int64_t &size) const; virtual int init_session_var(const ObPLResolveCtx &resolve_ctx, common::ObIAllocator &obj_allocator, diff --git a/src/share/schema/ob_routine_info.h b/src/share/schema/ob_routine_info.h index 3cbe5149d3..24bdac7dec 100644 --- a/src/share/schema/ob_routine_info.h +++ b/src/share/schema/ob_routine_info.h @@ -593,6 +593,10 @@ public: return is_udt_routine() && SP_FLAG_STATIC == (flag_ & SP_FLAG_STATIC); } + OB_INLINE bool is_dblink_routine() const { + return dblink_id_ != OB_INVALID_ID; + } + TO_STRING_KV(K_(tenant_id), K_(database_id), K_(package_id), diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 71b45984a7..19c26c5b84 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -8055,6 +8055,7 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical if (OB_SUCC(ret)) { phy_plan_->set_contain_pl_udf_or_trigger(log_plan.get_stmt()->get_query_ctx()->has_pl_udf_); phy_plan_->set_udf_has_dml_stmt(log_plan.get_stmt()->get_query_ctx()->udf_has_dml_stmt_); + phy_plan_->set_has_link_udf(log_plan.get_stmt()->get_query_ctx()->has_dblink_udf_); } } if (OB_SUCC(ret)) { diff --git a/src/sql/dblink/ob_dblink_utils.cpp b/src/sql/dblink/ob_dblink_utils.cpp index d7d380f2d5..613c6eef24 100644 --- a/src/sql/dblink/ob_dblink_utils.cpp +++ b/src/sql/dblink/ob_dblink_utils.cpp @@ -561,6 +561,25 @@ int ObDblinkUtils::has_reverse_link_or_any_dblink(const ObDMLStmt *stmt, bool &h } } } + if (OB_SUCC(ret) && !has && enable_check_any_dblink) { + ObSEArray udf_exprs; + if (OB_FAIL(stmt->get_udf_exprs(udf_exprs))) { + LOG_WARN("failed to get udf exprs", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && !has && i < udf_exprs.count(); ++i) { + ObRawExpr *expr = udf_exprs.at(i); + if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (expr->is_udf_expr()) { + ObUDFRawExpr *udf_expr = static_cast(expr); + if (OB_INVALID_ID != udf_expr->get_dblink_id()) { + has = true; + } + } + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < stmt->get_table_items().count(); ++i) { const TableItem *table_item = stmt->get_table_items().at(i); if (OB_ISNULL(table_item)) { diff --git a/src/sql/engine/cmd/ob_routine_executor.cpp b/src/sql/engine/cmd/ob_routine_executor.cpp index b943b1adec..4e82d7f954 100644 --- a/src/sql/engine/cmd/ob_routine_executor.cpp +++ b/src/sql/engine/cmd/ob_routine_executor.cpp @@ -218,7 +218,8 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st ctx.get_allocator(), NULL, stmt.get_dblink_routine_info(), - params))) { + params, + NULL))) { LOG_WARN("failed to execute dblink pl", K(ret), KP(stmt.get_dblink_routine_info())); #endif } diff --git a/src/sql/engine/expr/ob_expr_cast.cpp b/src/sql/engine/expr/ob_expr_cast.cpp index 9728be47bf..f7ab9e13c4 100644 --- a/src/sql/engine/expr/ob_expr_cast.cpp +++ b/src/sql/engine/expr/ob_expr_cast.cpp @@ -951,8 +951,7 @@ int ObExprCast::fill_element(const sql::ObExpr &expr, int64_t init_size = OB_INVALID_SIZE; if (OB_FAIL(collection_type->get_element_type().newx(*coll->get_allocator(), ns, ptr))) { LOG_WARN("failed to new element", K(ret)); - } else if (OB_FAIL(collection_type->get_element_type().get_size(*ns, - pl::PL_TYPE_INIT_SIZE, + } else if (OB_FAIL(collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE, init_size))) { LOG_WARN("failed to get size", K(ret)); } else { diff --git a/src/sql/engine/expr/ob_expr_collection_construct.cpp b/src/sql/engine/expr/ob_expr_collection_construct.cpp index 90c0f26c81..6d30409cf0 100644 --- a/src/sql/engine/expr/ob_expr_collection_construct.cpp +++ b/src/sql/engine/expr/ob_expr_collection_construct.cpp @@ -291,7 +291,7 @@ int ObExprCollectionConstruct::eval_collection_construct(const ObExpr &expr, int64_t ptr = 0; int64_t init_size = OB_INVALID_SIZE; OZ (collection_type->get_element_type().newx(*coll->get_allocator(), ns, ptr)); - OZ (collection_type->get_element_type().get_size(*ns, pl::PL_TYPE_INIT_SIZE, init_size)); + OZ (collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE, init_size)); OX (new_composite.set_extend(ptr, collection_type->get_element_type().get_type(), init_size)); OX (static_cast(coll->get_data())[i] = new_composite); } else if (pl::PL_OPAQUE_TYPE == v.get_meta().get_extend_type() diff --git a/src/sql/engine/expr/ob_expr_get_package_var.cpp b/src/sql/engine/expr/ob_expr_get_package_var.cpp index 396b6a85a2..9ed7f16c3f 100644 --- a/src/sql/engine/expr/ob_expr_get_package_var.cpp +++ b/src/sql/engine/expr/ob_expr_get_package_var.cpp @@ -60,7 +60,9 @@ int ObExprGetPackageVar::calc(ObObj &result, } else if (OB_ISNULL(pl_engine = session_info->get_pl_engine())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("pl engine is null", K(ret)); - } else if (OB_ISNULL(package_guard = exec_ctx->get_package_guard())) { + } else if (OB_FAIL(exec_ctx->get_package_guard(package_guard))) { + LOG_WARN("get package guard failed", K(ret)); + } else if (OB_ISNULL(package_guard)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("package guard is null", K(ret)); } else if (OB_NOT_NULL(exec_ctx->get_sql_ctx()) diff --git a/src/sql/engine/expr/ob_expr_obj_access.cpp b/src/sql/engine/expr/ob_expr_obj_access.cpp index 83cd6112cf..e396776297 100644 --- a/src/sql/engine/expr/ob_expr_obj_access.cpp +++ b/src/sql/engine/expr/ob_expr_obj_access.cpp @@ -307,13 +307,18 @@ int ObExprObjAccess::ExtraInfo::get_record_attr(const pl::ObObjAccessIdx ¤ CK (OB_NOT_NULL(ns)); OZ (ns->get_user_type(udt_id, user_type)); } else { - pl::ObPLResolveCtx resolve_ctx(alloc, - *ctx.exec_ctx_.get_my_session(), - *ctx.exec_ctx_.get_sql_ctx()->schema_guard_, - *ctx.exec_ctx_.get_package_guard(), - *ctx.exec_ctx_.get_sql_proxy(), - false); - OZ (resolve_ctx.get_user_type(udt_id, user_type)); + pl::ObPLPackageGuard *package_guard = NULL; + OZ (ctx.exec_ctx_.get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + if (OB_SUCC(ret)) { + pl::ObPLResolveCtx resolve_ctx(alloc, + *ctx.exec_ctx_.get_my_session(), + *ctx.exec_ctx_.get_sql_ctx()->schema_guard_, + *package_guard, + *ctx.exec_ctx_.get_sql_proxy(), + false); + OZ (resolve_ctx.get_user_type(udt_id, user_type)); + } } CK (OB_NOT_NULL(user_type)); CK (user_type->is_record_type()); diff --git a/src/sql/engine/expr/ob_expr_object_construct.cpp b/src/sql/engine/expr/ob_expr_object_construct.cpp index b544de775f..f6fc20f316 100644 --- a/src/sql/engine/expr/ob_expr_object_construct.cpp +++ b/src/sql/engine/expr/ob_expr_object_construct.cpp @@ -144,7 +144,7 @@ int ObExprObjectConstruct::newx(ObEvalCtx &ctx, ObObj &result, uint64_t udt_id) OZ (ns->get_user_type(udt_id, user_type, &tmp_alloc)); CK (OB_NOT_NULL(user_type)); OZ (user_type->newx(alloc, ns, ptr)); - OZ (user_type->get_size(*ns, pl::PL_TYPE_INIT_SIZE, init_size)); + OZ (user_type->get_size(pl::PL_TYPE_INIT_SIZE, init_size)); OX (new_composite.set_extend(ptr, user_type->get_type(), init_size)); OX (result = new_composite); } diff --git a/src/sql/engine/expr/ob_expr_pl_associative_index.cpp b/src/sql/engine/expr/ob_expr_pl_associative_index.cpp index c77e7846ad..2945494c4b 100644 --- a/src/sql/engine/expr/ob_expr_pl_associative_index.cpp +++ b/src/sql/engine/expr/ob_expr_pl_associative_index.cpp @@ -209,7 +209,7 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx, CK (OB_NOT_NULL(collection = reinterpret_cast(ptr))); OX (collection->set_count(0)); } - OZ (collection_type->get_element_type().get_size(*pl_exec_ctx, pl::PL_TYPE_INIT_SIZE, init_size)); + OZ (collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE, init_size)); OX (row->set_extend(ptr, collection_type->get_element_type().get_type(), init_size)); } if (OB_SUCC(ret)) { diff --git a/src/sql/engine/expr/ob_expr_subquery_ref.cpp b/src/sql/engine/expr/ob_expr_subquery_ref.cpp index cb1548face..4d874bc9a1 100644 --- a/src/sql/engine/expr/ob_expr_subquery_ref.cpp +++ b/src/sql/engine/expr/ob_expr_subquery_ref.cpp @@ -330,8 +330,7 @@ int ObExprSubQueryRef::expr_eval( ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session(); CK (OB_NOT_NULL(ctx.exec_ctx_.get_sql_ctx())); CK (OB_NOT_NULL(ctx.exec_ctx_.get_sql_ctx()->schema_guard_)); - OZ (pl_type.get_size(pl::ObPLUDTNS(*ctx.exec_ctx_.get_sql_ctx()->schema_guard_), - pl::PL_TYPE_INIT_SIZE, param_size)); + OZ (pl_type.get_size(pl::PL_TYPE_INIT_SIZE, param_size)); CK (OB_NOT_NULL(data = static_cast( ctx.exec_ctx_.get_allocator().alloc(param_size)))); CK (OB_NOT_NULL(obj = static_cast( diff --git a/src/sql/engine/expr/ob_expr_udf.cpp b/src/sql/engine/expr/ob_expr_udf.cpp index e7394218d7..35858248b7 100644 --- a/src/sql/engine/expr/ob_expr_udf.cpp +++ b/src/sql/engine/expr/ob_expr_udf.cpp @@ -741,7 +741,8 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res) false, true, info->loc_, - info->is_called_in_sql_))) { + info->is_called_in_sql_, + info->dblink_id_))) { LOG_WARN("fail to execute udf", K(ret), K(info), K(package_id), K(tmp_result)); if (info->is_called_in_sql_ && OB_NOT_NULL(ctx.exec_ctx_.get_pl_ctx())) { ctx.exec_ctx_.get_pl_ctx()->reset_obj_range_to_end(cur_obj_count); @@ -958,6 +959,7 @@ int ObExprUDFInfo::deep_copy(common::ObIAllocator &allocator, other.loc_ = loc_; other.is_udt_cons_ = is_udt_cons_; other.is_called_in_sql_ = is_called_in_sql_; + other.dblink_id_ = dblink_id_; OZ(other.subprogram_path_.assign(subprogram_path_)); OZ(other.params_type_.assign(params_type_)); OZ(other.params_desc_.assign(params_desc_)); @@ -980,6 +982,7 @@ int ObExprUDFInfo::from_raw_expr(RE &raw_expr) is_udt_udf_ = udf_expr.get_is_udt_udf(); loc_ = udf_expr.get_loc(); is_udt_cons_ = udf_expr.get_is_udt_cons(); + dblink_id_ = udf_expr.get_dblink_id(); return ret; } diff --git a/src/sql/engine/expr/ob_expr_udf.h b/src/sql/engine/expr/ob_expr_udf.h index b06c9e9424..7dae8aea0e 100644 --- a/src/sql/engine/expr/ob_expr_udf.h +++ b/src/sql/engine/expr/ob_expr_udf.h @@ -39,7 +39,8 @@ struct ObExprUDFInfo : public ObIExprExtraInfo public: ObExprUDFInfo(common::ObIAllocator &alloc, ObExprOperatorType type) : ObIExprExtraInfo(alloc, type), - subprogram_path_(alloc), params_type_(alloc), params_desc_(alloc), nocopy_params_(alloc) + subprogram_path_(alloc), params_type_(alloc), params_desc_(alloc), nocopy_params_(alloc), + dblink_id_(OB_INVALID_ID) { } @@ -61,6 +62,7 @@ public: uint64_t loc_; bool is_udt_cons_; bool is_called_in_sql_; + uint64_t dblink_id_; }; class ObSqlCtx; class ObUDFParamDesc; diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index 6628e18fb6..9b4b9a0f36 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -990,6 +990,17 @@ pl::ObPLPackageGuard* ObExecContext::get_package_guard() return package_guard_; } +int ObExecContext::get_package_guard(pl::ObPLPackageGuard *&package_guard) +{ + int ret = OB_SUCCESS; + package_guard = get_package_guard(); + if (OB_ISNULL(package_guard)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get package guard failed", K(ret)); + } + return ret; +} + DEFINE_SERIALIZE(ObExecContext) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index 50a62080e5..90e73f235a 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -343,7 +343,9 @@ public: inline pl::ObPLCtx *get_pl_ctx() { return pl_ctx_; } inline void set_pl_ctx(pl::ObPLCtx *pl_ctx) { pl_ctx_ = pl_ctx; } pl::ObPLPackageGuard* get_package_guard(); - + int get_package_guard(pl::ObPLPackageGuard *&package_guard); + inline pl::ObPLPackageGuard* get_original_package_guard() { return package_guard_; } + inline void set_package_guard(pl::ObPLPackageGuard* v) { package_guard_ = v; } int init_pl_ctx(); ObPartIdRowMapManager& get_part_row_manager() { return part_row_map_manager_; } diff --git a/src/sql/engine/ob_physical_plan.h b/src/sql/engine/ob_physical_plan.h index f09a3827ab..c95f66761d 100644 --- a/src/sql/engine/ob_physical_plan.h +++ b/src/sql/engine/ob_physical_plan.h @@ -331,6 +331,9 @@ public: inline bool has_link_table() const { return has_link_table_; } inline void set_has_link_sfd(bool value) { has_link_sfd_ = value; } inline bool has_link_sfd() const { return has_link_sfd_; } + + inline void set_has_link_udf(bool value) { has_link_udf_ = value; } + inline bool has_link_udf() const { return has_link_udf_; } void set_batch_size(const int64_t v) { batch_size_ = v; } int64_t get_batch_size() const { return batch_size_; } bool is_vectorized() const { return batch_size_ > 0; } @@ -656,6 +659,7 @@ public: bool use_temp_table_; bool has_link_table_; bool has_link_sfd_; + bool has_link_udf_; bool need_serial_exec_;//mark if need serial execute? bool temp_sql_can_prepare_; bool is_need_trans_; diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 5b08b84a8a..28e1ade6af 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -321,6 +321,10 @@ int ObResultSet::start_stmt() } LOG_DEBUG("dblink xa trascaction need skip start_stmt()", K(PHY_LINK_DML == phy_plan->get_root_op_spec()->type_), K(my_session_.get_dblink_context().is_dblink_xa_tras())); } else { + if (phy_plan->has_link_udf() && ac) { + my_session_.set_autocommit(false); + my_session_.set_restore_auto_commit(); + } if (0 < plan_ctx->get_tx_id()) { const transaction::ObTransID tx_id(plan_ctx->get_tx_id()); if (OB_FAIL(sql::ObTMService::recover_tx_for_callback(tx_id, get_exec_context()))) { @@ -1288,7 +1292,8 @@ bool ObResultSet::need_end_trans_callback() const need = ac && !explicit_start_trans && !is_with_rows(); } else if (OB_LIKELY(NULL != physical_plan_) && OB_LIKELY(physical_plan_->is_need_trans()) && - !physical_plan_->is_link_dml_plan()) { + !physical_plan_->is_link_dml_plan() && + !physical_plan_->has_link_udf()) { need = (true == ObSqlTransUtil::plan_can_end_trans(ac, explicit_start_trans)) && (false == ObSqlTransUtil::is_remote_trans(ac, explicit_start_trans, physical_plan_->get_plan_type())); } diff --git a/src/sql/ob_spi.cpp b/src/sql/ob_spi.cpp index 2aa03a80b6..5cf7dfe46d 100644 --- a/src/sql/ob_spi.cpp +++ b/src/sql/ob_spi.cpp @@ -5632,7 +5632,7 @@ int ObSPIService::spi_set_collection(int64_t tenant_id, CK (OB_NOT_NULL(record = reinterpret_cast(ptr))); OX (record->set_null()); } - OZ (collection_type->get_element_type().get_size(*ns, PL_TYPE_INIT_SIZE, init_size)); + OZ (collection_type->get_element_type().get_size(PL_TYPE_INIT_SIZE, init_size)); OX (row->set_extend(ptr, collection_type->get_element_type().get_type(), init_size)); } } @@ -8736,26 +8736,27 @@ int ObSPIService::spi_update_location(pl::ObPLExecCtx *ctx, uint64_t location) } #ifdef OB_BUILD_ORACLE_PL -int ObSPIService::spi_execute_dblink(pl::ObPLExecCtx *ctx, +int ObSPIService::spi_execute_dblink(ObExecContext &exec_ctx, + ObIAllocator &allocator, uint64_t dblink_id, uint64_t package_id, uint64_t proc_id, - ParamStore ¶ms) + ParamStore ¶ms, + ObObj *result) { int ret = OB_SUCCESS; - ObExecContext *exec_ctx = NULL; const ObRoutineInfo *routine_info = NULL; const pl::ObPLDbLinkInfo *dblink_info = NULL; - CK (OB_NOT_NULL(ctx), ctx->valid()); - CK (OB_NOT_NULL(ctx->guard_)); - CK (OB_NOT_NULL(ctx->allocator_)); - CK (OB_NOT_NULL(exec_ctx = ctx->exec_ctx_)); - OZ (ctx->guard_->dblink_guard_.get_dblink_routine_info(dblink_id, package_id, proc_id, routine_info), - dblink_id, package_id, proc_id); + pl::ObPLPackageGuard *package_guard = NULL; + OZ (exec_ctx.get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + OZ (package_guard->dblink_guard_.get_dblink_routine_info(dblink_id, package_id, + proc_id, routine_info), + dblink_id, package_id, proc_id); CK (OB_NOT_NULL(routine_info)); - OZ (ctx->guard_->dblink_guard_.get_dblink_info(dblink_id, dblink_info)); + OZ (package_guard->dblink_guard_.get_dblink_info(dblink_id, dblink_info)); CK (OB_NOT_NULL(dblink_info)); - OZ (spi_execute_dblink(*exec_ctx, *ctx->allocator_, dblink_info, routine_info, params)); + OZ (spi_execute_dblink(exec_ctx, allocator, dblink_info, routine_info, params, result)); return ret; } @@ -8763,7 +8764,8 @@ int ObSPIService::spi_execute_dblink(ObExecContext &exec_ctx, ObIAllocator &allocator, const pl::ObPLDbLinkInfo *dblink_info, const ObRoutineInfo *routine_info, - ParamStore ¶ms) + ParamStore ¶ms, + ObObj *result) { int ret = OB_SUCCESS; sql::DblinkGetConnType conn_type = sql::DblinkGetConnType::DBLINK_POOL; @@ -8793,6 +8795,16 @@ int ObSPIService::spi_execute_dblink(ObExecContext &exec_ctx, } common::ObSEArray udts; ParamStore exec_params((ObWrapperAllocator(allocator))); + ObObj tmp_result; + bool is_print_sql = false; + if (routine_info->is_function()) { + ObExecContext *top_ctx = &exec_ctx; + while (OB_SUCC(ret) && !is_print_sql && NULL != top_ctx) { + CK (OB_NOT_NULL(top_ctx->get_sql_ctx())); + OX (is_print_sql = (stmt::T_SELECT == top_ctx->get_sql_ctx()->stmt_type_)); + OX (top_ctx = top_ctx->get_parent_ctx()); + } + } for (int64_t i = 0; OB_SUCC(ret) && i < params.count(); i++) { ObObjParam param_value; if (params.at(i).is_pl_extend()) { @@ -8807,16 +8819,20 @@ int ObSPIService::spi_execute_dblink(ObExecContext &exec_ctx, } OZ (ObPLDblinkUtil::print_dblink_ps_call_stmt(allocator, dblink_info, call_stmt, params, routine_info, - udts, out_param_idx, out_param_cnt)); + udts, out_param_idx, out_param_cnt, is_print_sql)); OZ (ObTMService::tm_rm_start(exec_ctx, link_type, dblink_conn, tx_id)); OZ (dblink_proxy->dblink_execute_proc(OB_INVALID_TENANT_ID, dblink_conn, allocator, exec_params, call_stmt, *routine_info, udts, - session->get_timezone_info()), call_stmt); - OZ (spi_after_execute_dblink(session, routine_info, allocator, params, exec_params)); + session->get_timezone_info(), &tmp_result), call_stmt); + OZ (spi_after_execute_dblink(session, routine_info, allocator, params, exec_params, result, tmp_result)); + if (OB_SUCC(ret) && NULL != result && !result->is_null() && result->is_ext()) { + CK (OB_NOT_NULL(exec_ctx.get_pl_ctx())); + OZ (exec_ctx.get_pl_ctx()->add(*result)); + } } if (OB_NOT_NULL(dblink_conn)) { int tmp_ret = OB_SUCCESS; - if (OB_FAIL(ObDblinkCtxInSession::revert_dblink_conn(dblink_conn))) { + if (OB_SUCCESS != (tmp_ret = ObDblinkCtxInSession::revert_dblink_conn(dblink_conn))) { ret = (ret == OB_SUCCESS) ? tmp_ret : ret; LOG_WARN("failed to revert dblink conn", K(ret), K(tmp_ret), KP(dblink_conn)); } @@ -8829,12 +8845,15 @@ int ObSPIService::spi_after_execute_dblink(ObSQLSessionInfo *session, const ObRoutineInfo *routine_info, ObIAllocator &allocator, ParamStore ¶ms, - ParamStore &exec_params) + ParamStore &exec_params, + ObObj *result, + ObObj &tmp_result) { int ret = OB_SUCCESS; CK (OB_NOT_NULL(routine_info)); - for (int64_t i = 0; OB_SUCC(ret) && i < routine_info->get_routine_params().count(); i++) { - ObRoutineParam *param = routine_info->get_routine_params().at(i); + + for (int64_t i = 0; OB_SUCC(ret) && i < params.count(); i++) { + ObRoutineParam *param = routine_info->get_routine_params().at(i + routine_info->get_param_start_idx()); if (OB_ISNULL(param)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("param is NULL", K(ret), K(i)); @@ -8844,7 +8863,7 @@ int ObSPIService::spi_after_execute_dblink(ObSQLSessionInfo *session, params.at(i).set_null(); } else { if (ob_is_extend(param->get_param_type().get_obj_type())) { - OZ (ObUserDefinedType::deep_copy_obj(allocator, exec_params.at(i), params.at(i), true)); + OZ (ObUserDefinedType::deep_copy_obj(allocator, exec_params.at(i), params.at(i), true)); ObUserDefinedType::destruct_obj(exec_params.at(i)); } else if (param->get_param_type().get_obj_type() != exec_params.at(i).get_param_meta().get_type()) { const ObDataType &datatype = param->get_param_type(); @@ -8865,6 +8884,30 @@ int ObSPIService::spi_after_execute_dblink(ObSQLSessionInfo *session, ObUserDefinedType::destruct_obj(exec_params.at(i)); } } + if (OB_SUCC(ret) && routine_info->is_function()) { + if (OB_ISNULL(routine_info->get_ret_type()) || OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return type is NULL", K(ret), KPC(routine_info), K(result)); + } else if (ob_is_string_or_lob_type(routine_info->get_ret_type()->get_obj_type()) + && (0 == tmp_result.get_varchar().case_compare(""))) { + result->set_null(); + } else if (ob_is_extend(routine_info->get_ret_type()->get_obj_type())) { + OZ (ObUserDefinedType::deep_copy_obj(allocator, tmp_result, *result, true)); + ObUserDefinedType::destruct_obj(tmp_result); + } else if (tmp_result.get_type() != routine_info->get_ret_type()->get_obj_type()) { + const ObDataType *datatype = routine_info->get_ret_type(); + ObObj convert_obj; + ObExprResType convert_type; + CK (OB_NOT_NULL(datatype)); + OX (convert_type.reset()); + OX (convert_type.set_meta(datatype->get_meta_type())); + OX (convert_type.set_accuracy(datatype->get_accuracy())); + OZ (spi_convert(*session, allocator, tmp_result, convert_type, convert_obj)); + OZ (deep_copy_obj(allocator, convert_obj, *result)); + } else { + OZ (deep_copy_obj(allocator, tmp_result, *result)); + } + } return ret; } diff --git a/src/sql/ob_spi.h b/src/sql/ob_spi.h index d0afce4aac..cb0db4bb02 100644 --- a/src/sql/ob_spi.h +++ b/src/sql/ob_spi.h @@ -742,21 +742,26 @@ public: static int fill_cursor(ObResultSet &result_set, ObSPICursor *cursor, int64_t new_query_start_time); #ifdef OB_BUILD_ORACLE_PL - static int spi_execute_dblink(pl::ObPLExecCtx *ctx, + static int spi_execute_dblink(ObExecContext &exec_ctx, + ObIAllocator &allocator, uint64_t dblink_id, uint64_t package_id, uint64_t proc_id, - ParamStore ¶ms); + ParamStore ¶ms, + ObObj *result); static int spi_execute_dblink(ObExecContext &exec_ctx, ObIAllocator &allocator, const pl::ObPLDbLinkInfo *dblink_info, const ObRoutineInfo *routine_info, - ParamStore ¶ms); + ParamStore ¶ms, + ObObj *result); static int spi_after_execute_dblink(ObSQLSessionInfo *session, const ObRoutineInfo *routine_info, ObIAllocator &allocator, ParamStore ¶ms, - ParamStore &exec_params); + ParamStore &exec_params, + ObObj *result, + ObObj &tmp_result); #endif private: static int recreate_implicit_savapoint_if_need(pl::ObPLExecCtx *ctx, int &result); diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index ab1fec17df..8eb270897b 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -97,6 +97,8 @@ #include "sql/optimizer/ob_explain_log_plan.h" #include "sql/dblink/ob_dblink_utils.h" #include "sql/plan_cache/ob_values_table_compression.h" +#include "pl/ob_pl_stmt.h" +#include "pl/ob_pl_resolver.h" namespace oceanbase { @@ -2727,6 +2729,13 @@ int ObSql::generate_stmt(ParseResult &parse_result, } } } + if (OB_SUCC(ret)) { + if (NULL != resolver_ctx.secondary_namespace_ + && NULL != resolver_ctx.secondary_namespace_->get_external_ns()) { + resolver_ctx.package_guard_ = + &resolver_ctx.secondary_namespace_->get_external_ns()->get_resolve_ctx().package_guard_; + } + } if (OB_SUCC(ret)) { resolver_ctx.is_prepare_protocol_ = context.is_prepare_protocol_; resolver_ctx.is_prepare_stage_ = context.is_prepare_stage_; @@ -3611,7 +3620,7 @@ int ObSql::code_generate( LOG_DEBUG("phy plan", K(*phy_plan)); phy_plan->stat_.is_use_jit_ = use_jit; phy_plan->set_returning(stmt->is_returning()); - phy_plan->set_has_link_table(has_dblink); + phy_plan->set_has_link_table(has_dblink || phy_plan->has_link_udf()); // set plan insert flag : insert into values(..); // value num is n (n >= 1); if (stmt->is_insert_stmt()) { ObInsertStmt *insert_stmt = static_cast(stmt); diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 2a5963f612..2422e26903 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -826,7 +826,8 @@ public: int8_t has_pl_udf_ : 1; // used to mark sql contain pl udf int8_t udf_has_select_stmt_ : 1; // udf has select stmt, not contain other dml stmt int8_t udf_has_dml_stmt_ : 1; // udf has dml stmt - int8_t reserved_:5; + int8_t has_dblink_udf_ : 1; // udf is dblink udf + int8_t reserved_:4; }; }; bool has_dblink_; diff --git a/src/sql/ob_sql_utils.cpp b/src/sql/ob_sql_utils.cpp index 5be2af7715..21cb612ac7 100644 --- a/src/sql/ob_sql_utils.cpp +++ b/src/sql/ob_sql_utils.cpp @@ -762,6 +762,9 @@ int ObSQLUtils::se_calc_const_expr(ObSQLSessionInfo *session, exec_ctx.set_physical_plan_ctx(&phy_plan_ctx); if (NULL != out_ctx) { exec_ctx.set_sql_ctx(out_ctx->get_sql_ctx()); + if (NULL != out_ctx->get_original_package_guard()) { + exec_ctx.set_package_guard(out_ctx->get_original_package_guard()); + } } void *frame_buf = NULL; ObPreCalcExprFrameInfo *pre_calc_frame = NULL; @@ -801,6 +804,10 @@ int ObSQLUtils::se_calc_const_expr(ObSQLSessionInfo *session, } } } + if (NULL != out_ctx && NULL != out_ctx->get_original_package_guard()) { + // avoid out_ctx.package_guard_ be freed + exec_ctx.set_package_guard(NULL); + } } } return ret; diff --git a/src/sql/printer/ob_raw_expr_printer.cpp b/src/sql/printer/ob_raw_expr_printer.cpp index 158c5ef084..9a8e6c5632 100644 --- a/src/sql/printer/ob_raw_expr_printer.cpp +++ b/src/sql/printer/ob_raw_expr_printer.cpp @@ -3245,7 +3245,13 @@ int ObRawExprPrinter::print(ObSysFunRawExpr *expr) break; } case T_FUN_UDF: { + ObUDFRawExpr *udf_expr = static_cast(expr); + CK (OB_NOT_NULL(udf_expr)); PRINT_IDENT_WITH_QUOT(func_name); + if (OB_SUCC(ret) && udf_expr->is_dblink_sys_func()) { + CK (!udf_expr->get_dblink_name().empty()); + DATA_PRINTF("@%.*s", LEN_AND_PTR(udf_expr->get_dblink_name())); + } OZ(inner_print_fun_params(*expr)); break; } @@ -3372,6 +3378,10 @@ do { \ DATA_PRINTF("."); } PRINT_IDENT_WITH_QUOT(expr->get_func_name()); + if (expr->is_dblink_sys_func()) { + CK (!expr->get_dblink_name().empty()); + DATA_PRINTF("@%.*s", LEN_AND_PTR(expr->get_dblink_name())); + } DATA_PRINTF("("); ObIArray ¶ms_type = expr->get_params_type(); diff --git a/src/sql/resolver/cmd/ob_call_procedure_resolver.cpp b/src/sql/resolver/cmd/ob_call_procedure_resolver.cpp index 3561c37e37..3a2661e2ab 100644 --- a/src/sql/resolver/cmd/ob_call_procedure_resolver.cpp +++ b/src/sql/resolver/cmd/ob_call_procedure_resolver.cpp @@ -341,6 +341,12 @@ int ObCallProcedureResolver::resolve(const ParseNode &parse_tree) } else if (NULL != stmt->get_call_proc_info()) { // find call procedure info in pl cache. } else { + if (NULL == params_.package_guard_) { + pl::ObPLPackageGuard *package_guard = NULL; + OZ (params_.session_info_->get_cur_exec_ctx()->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + OX (params_.package_guard_ = package_guard); + } OZ (ObCacheObjectFactory::alloc(stmt->get_cacheobj_guard(), ObLibCacheNameSpace::NS_CALLSTMT, session_info_->get_effective_tenant_id())); @@ -382,13 +388,12 @@ int ObCallProcedureResolver::resolve(const ParseNode &parse_tree) } } ObSEArray expr_params; - pl::ObPLPackageGuard package_guard(params_.session_info_->get_effective_tenant_id()); // 获取routine schem info if (OB_SUCC(ret)) { if (OB_NOT_NULL(params_node) && OB_FAIL(resolve_param_exprs(params_node, expr_params))) { LOG_WARN("failed to resolve param exprs", K(ret)); - } else if (OB_FAIL(ObResolverUtils::get_routine(package_guard, + } else if (OB_FAIL(ObResolverUtils::get_routine(*params_.package_guard_, params_, (*session_info_).get_effective_tenant_id(), (*session_info_).get_database_name(), @@ -466,7 +471,7 @@ int ObCallProcedureResolver::resolve(const ParseNode &parse_tree) *(params_.sql_proxy_), pl_type, NULL, - &package_guard.dblink_guard_)); + ¶ms_.package_guard_->dblink_guard_)); } if (OB_SUCC(ret)) { if (param_info->is_out_sp_param() || param_info->is_inout_sp_param()) { @@ -543,6 +548,10 @@ int ObCallProcedureResolver::resolve(const ParseNode &parse_tree) } if (OB_SUCC(ret) && OB_NOT_NULL(proc_info) && (OB_INVALID_ID != proc_info->get_dblink_id())) { stmt->set_dblink_routine_info(proc_info); + if (proc_info->is_function()) { + ret = OB_ERR_NOT_VALID_ROUTINE_NAME; + LOG_WARN("ORA-06576: not a valid function or procedure name", K(ret), KPC(proc_info)); + } } // Step 4: cg raw expr OX (call_proc_info->set_param_cnt(params.count())); diff --git a/src/sql/resolver/ddl/ob_create_routine_resolver.cpp b/src/sql/resolver/ddl/ob_create_routine_resolver.cpp index 86488a3750..db8bf366a9 100644 --- a/src/sql/resolver/ddl/ob_create_routine_resolver.cpp +++ b/src/sql/resolver/ddl/ob_create_routine_resolver.cpp @@ -390,7 +390,8 @@ int ObCreateRoutineResolver::resolve_param_type(const ParseNode *type_node, *(schema_checker_->get_schema_guard()), *(params_.sql_proxy_), obj_access_idents, - access_idxs))) { + access_idxs, + params_.package_guard_))) { // maybe dependent object not exist yet! LOG_WARN("failed to transform from iparam", K(ret)); if (ObPLResolver::is_object_not_exist_error(ret)) { @@ -464,7 +465,8 @@ int ObCreateRoutineResolver::resolve_param_type(const ParseNode *type_node, *(schema_checker_->get_schema_guard()), *(params_.sql_proxy_), obj_access_idents, - access_idxs))) { + access_idxs, + params_.package_guard_))) { // maybe dependent object not exist yet! LOG_WARN("failed to transform from iparam", K(ret)); if (ObPLResolver::is_object_not_exist_error(ret)) { diff --git a/src/sql/resolver/dml/ob_del_upd_stmt.cpp b/src/sql/resolver/dml/ob_del_upd_stmt.cpp index 26137feed5..ee1d025ea5 100644 --- a/src/sql/resolver/dml/ob_del_upd_stmt.cpp +++ b/src/sql/resolver/dml/ob_del_upd_stmt.cpp @@ -238,7 +238,8 @@ int ObInsertTableInfo::iterate_stmt_expr(ObStmtExprVisitor &visitor) ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret)); } else if ((values_vector_.at(i)->has_flag(CNT_SUB_QUERY) || - values_vector_.at(i)->has_flag(CNT_ONETIME)) && + values_vector_.at(i)->has_flag(CNT_ONETIME) || + values_vector_.at(i)->has_flag(CNT_PL_UDF)) && OB_FAIL(visitor.visit(values_vector_.at(i), SCOPE_INSERT_VECTOR))) { LOG_WARN("failed to add expr to expr checker", K(ret)); } else { /*do nothing*/ } diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index 29e27154fb..1831a8aa42 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -4725,13 +4725,17 @@ int ObDMLResolver::resolve_function_table_item(const ParseNode &parse_tree, // PL collection used in TABLE(), extract PL info from schema CK(OB_NOT_NULL(schema_checker_)); if (OB_SUCC(ret)) { - ObPLPackageGuard package_guard(params_.session_info_->get_effective_tenant_id()); + pl::ObPLPackageGuard *package_guard = NULL; const ObUserDefinedType *user_type = NULL; CK (OB_NOT_NULL(params_.schema_checker_)); + CK (OB_NOT_NULL(session_info_)); + CK (OB_NOT_NULL(session_info_->get_cur_exec_ctx())); + OZ (session_info_->get_cur_exec_ctx()->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); OZ (ObResolverUtils::get_user_type( params_.allocator_, params_.session_info_, params_.sql_proxy_, params_.schema_checker_->get_schema_guard(), - package_guard, + *package_guard, function_table_expr->get_udt_id(), user_type)); if (OB_FAIL(ret)) { @@ -4745,6 +4749,9 @@ int ObDMLResolver::resolve_function_table_item(const ParseNode &parse_tree, LOG_WARN("ORA-22905: cannot access rows from a non-nested table item", K(ret), K(function_table_expr->get_result_type())); LOG_USER_ERROR(OB_NOT_SUPPORTED, "access rows from a non-nested table item"); + } else if (is_dblink_type_id(user_type->get_user_type_id())) { + ret = OB_ERR_INVALID_DATATYPE; + LOG_WARN("user type can not be dblink type in table function", K(ret)); } else if (OB_FAIL(add_udt_dependency(*user_type))) { LOG_WARN("failed to add udt dependency", K(ret), KPC(user_type)); } @@ -11371,6 +11378,10 @@ int ObDMLResolver::collect_schema_version(ObRawExpr *expr) *expr, *stmt_->get_query_ctx())); OX (stmt_->get_query_ctx()->disable_udf_parallel_ |= !udf_expr->is_parallel_enable()); + OX (stmt_->get_query_ctx()->disable_udf_parallel_ |= is_valid_id(udf_expr->get_dblink_id())); + OX (stmt_->get_query_ctx()->has_dblink_udf_ |= is_valid_id(udf_expr->get_dblink_id())); + OX (stmt_->get_query_ctx()->set_has_dblink( + stmt_->get_query_ctx()->has_dblink() || is_valid_id(udf_expr->get_dblink_id()))); OX (stmt_->get_query_ctx()->has_pl_udf_ = true); if (OB_SUCC(ret) && T_FIELD_LIST_SCOPE == current_scope_ && @@ -11448,6 +11459,7 @@ int ObDMLResolver::resolve_external_name(ObQualifiedName &q_name, columns, real_exprs, expr, + params_.package_guard_, params_.is_prepare_protocol_, false, /*is_check_mode*/ current_scope_ != T_CURRENT_OF_SCOPE /*is_sql_scope*/))) { diff --git a/src/sql/resolver/dml/ob_dml_stmt.cpp b/src/sql/resolver/dml/ob_dml_stmt.cpp index d7aa70b06f..b72e0cabdf 100644 --- a/src/sql/resolver/dml/ob_dml_stmt.cpp +++ b/src/sql/resolver/dml/ob_dml_stmt.cpp @@ -3963,6 +3963,24 @@ int ObDMLStmt::get_sequence_exprs(ObIArray &exprs) const return ret; } +int ObDMLStmt::get_udf_exprs(common::ObIArray &exprs) const +{ + int ret = OB_SUCCESS; + ObSEArray relation_exprs; + if (OB_FAIL(get_relation_exprs(relation_exprs))) { + LOG_WARN("failed to get relation exprs", K(ret)); + } + ObRawExpr *cur_expr = NULL; + for (int64_t i = 0; OB_SUCC(ret) && i < relation_exprs.count(); i++) { + if (OB_ISNULL(cur_expr = relation_exprs.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get null expr", K(ret)); + } else if (OB_FAIL(ObTransformUtils::extract_udf_exprs(cur_expr, exprs))) { + LOG_WARN("failed to extract udf exprs", K(ret)); + } + } + return ret; +} int ObDMLStmt::find_var_assign_in_query_ctx(bool &is_found) const { int ret = OB_SUCCESS; diff --git a/src/sql/resolver/dml/ob_dml_stmt.h b/src/sql/resolver/dml/ob_dml_stmt.h index 9f19408629..2a8efcff5f 100644 --- a/src/sql/resolver/dml/ob_dml_stmt.h +++ b/src/sql/resolver/dml/ob_dml_stmt.h @@ -1084,6 +1084,7 @@ public: const common::ObString seq_action, // NEXTVAL or CURRVAL const uint64_t seq_id) const; int get_sequence_exprs(common::ObIArray &exprs) const; + int get_udf_exprs(common::ObIArray &exprs) const; int has_rand(bool &has_rand) const { return has_special_expr(CNT_RAND_FUNC, has_rand); } virtual int has_special_expr(const ObExprInfoFlag, bool &has) const; const TransposeItem *get_transpose_item() const { return transpose_item_; } diff --git a/src/sql/resolver/expr/ob_raw_expr.cpp b/src/sql/resolver/expr/ob_raw_expr.cpp index bc0999de0e..1c01818763 100644 --- a/src/sql/resolver/expr/ob_raw_expr.cpp +++ b/src/sql/resolver/expr/ob_raw_expr.cpp @@ -4203,8 +4203,22 @@ int ObSysFunRawExpr::get_name_internal(char *buf, const int64_t buf_len, int64_t if (T_FUN_SYS_AUTOINC_NEXTVAL == get_expr_type() && OB_FAIL(get_autoinc_nextval_name(buf, buf_len, pos))) { LOG_WARN("fail to get_autoinc_nextval_name", K(ret)); - } else if (OB_FAIL(BUF_PRINTF("%.*s(", get_func_name().length(), get_func_name().ptr()))) { + } else if (OB_FAIL(BUF_PRINTF("%.*s", get_func_name().length(), get_func_name().ptr()))) { LOG_WARN("fail to BUF_PRINTF", K(ret)); + } else { + if (is_valid_id(get_dblink_id())) { + if (get_dblink_name().empty()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dblink name is empty", K(ret), KPC(this)); + } else if (OB_FAIL(BUF_PRINTF("@%.*s", get_dblink_name().length(), get_dblink_name().ptr()))) { + LOG_WARN("failed to print dblink name", K(ret), K(get_dblink_name())); + } + } + if (OB_SUCC(ret) && OB_FAIL(BUF_PRINTF("("))) { + LOG_WARN("fail to BUF_PRINTF", K(ret)); + } + } + if (OB_FAIL(ret)) { } else if (T_FUN_COLUMN_CONV == get_expr_type()) { if (OB_FAIL(get_column_conv_name(buf, buf_len, pos, type))) { LOG_WARN("fail to get_column_conv_name", K(ret)); diff --git a/src/sql/resolver/expr/ob_raw_expr.h b/src/sql/resolver/expr/ob_raw_expr.h index 0869a408a6..222a13854e 100644 --- a/src/sql/resolver/expr/ob_raw_expr.h +++ b/src/sql/resolver/expr/ob_raw_expr.h @@ -950,6 +950,7 @@ struct ObUDFInfo udf_name_(), udf_package_(), udf_database_(), + dblink_name_(), param_names_(), param_exprs_(), udf_param_num_(0), @@ -997,6 +998,7 @@ struct ObUDFInfo TO_STRING_KV(K_(udf_name), K_(udf_package), K_(udf_database), + K_(dblink_name), K_(param_names), K_(param_exprs), K_(udf_param_num), @@ -1009,6 +1011,7 @@ struct ObUDFInfo common::ObString udf_name_; common::ObString udf_package_; common::ObString udf_database_; + common::ObString dblink_name_; common::ObArray param_names_; common::ObArray param_exprs_; int64_t udf_param_num_; @@ -1188,6 +1191,12 @@ public: } return bret; } + inline bool is_dblink_udf() const + { + bool bret = !access_idents_.empty() + && access_idents_.at(access_idents_.count() - 1).is_pl_udf(); + return bret && !dblink_name_.empty(); + } inline bool is_udf_return_access() const { bool bret = !access_idents_.empty() diff --git a/src/sql/resolver/expr/ob_raw_expr_resolver_impl.cpp b/src/sql/resolver/expr/ob_raw_expr_resolver_impl.cpp index 9f5de1ac2d..e389b1c724 100644 --- a/src/sql/resolver/expr/ob_raw_expr_resolver_impl.cpp +++ b/src/sql/resolver/expr/ob_raw_expr_resolver_impl.cpp @@ -1185,6 +1185,12 @@ int ObRawExprResolverImpl::do_recursive_resolve(const ParseNode *node, ObRawExpr } break; } + case T_DBLINK_UDF: { + if (OB_FAIL(process_dblink_udf_node(node, expr))) { + LOG_WARN("failed to process dblink udf node", K(ret), K(node)); + } + break; + } default: ret = OB_ERR_PARSER_SYNTAX; LOG_WARN("Wrong type in expression", K(get_type_name(node->type_))); @@ -1393,9 +1399,31 @@ int ObRawExprResolverImpl::process_remote_sequence_node(const ParseNode *node, O ObColumnRefRawExpr *b_expr = NULL; uint64_t tenant_id = ctx_.session_info_->get_effective_tenant_id(); const ObDbLinkSchema *dblink_schema = NULL; - if (OB_FAIL(ctx_.schema_checker_->get_dblink_schema(tenant_id, - column_ref.dblink_name_, - dblink_schema))) { + for (int64_t i = 0; OB_SUCC(ret) && i < 3; i++) { + if (OB_NOT_NULL(node->children_[i])) { + if (T_IDENT != node->children_[i]->type_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child type error", K(ret), K(i), K(node->children_[i]->type_)); + } else { + ObString ident_name(static_cast(node->children_[i]->str_len_), node->children_[i]->str_value_); + if (OB_FAIL(column_ref.access_idents_.push_back(ObObjAccessIdent(ident_name, OB_INVALID_INDEX)))) { + LOG_WARN("push back failed", K(ret), K(column_ref)); + } + } + } + } + int64_t acc_cnt = column_ref.access_idents_.count(); + if (OB_FAIL(ret)) { + } else if (acc_cnt > 0 + && (0 != column_ref.access_idents_.at(acc_cnt - 1).access_name_.case_compare("NEXTVAL") + && (0 != column_ref.access_idents_.at(acc_cnt - 1).access_name_.case_compare("CURRVAL")) + && lib::is_oracle_mode())) { + if (OB_FAIL(resolve_dblink_udf_expr(NULL, column_ref, expr))) { + LOG_WARN("resolve dblink udf expr failed", K(ret), K(column_ref)); + } + } else if (OB_FAIL(ctx_.schema_checker_->get_dblink_schema(tenant_id, + column_ref.dblink_name_, + dblink_schema))) { LOG_WARN("failed to get dblink schema", K(ret)); } else if (OB_ISNULL(dblink_schema)) { ret = OB_DBLINK_NOT_EXIST_TO_ACCESS; @@ -1422,6 +1450,113 @@ int ObRawExprResolverImpl::process_remote_sequence_node(const ParseNode *node, O return ret; } +int ObRawExprResolverImpl::process_dblink_udf_node(const ParseNode *node, ObRawExpr *&expr) +{ + int ret = OB_SUCCESS; + ObQualifiedName column_ref; + CK (OB_NOT_NULL(node)); + CK (OB_NOT_NULL(ctx_.columns_)); + CK (OB_NOT_NULL(ctx_.session_info_)); + CK (OB_NOT_NULL(ctx_.schema_checker_)); + OV (T_DBLINK_UDF == node->type_, OB_ERR_UNEXPECTED, K(node->type_)); + CK (5 == node->num_child_); + CK (OB_NOT_NULL(node->children_[2])); + CK (OB_NOT_NULL(node->children_[3])); + // resolve a.b.c + for (int64_t i = 0; OB_SUCC(ret) && i < 3; i++) { + if (OB_NOT_NULL(node->children_[i])) { + if (T_IDENT != node->children_[i]->type_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child type error", K(ret), K(i), K(node->children_[i]->type_)); + } else { + ObString ident_name(static_cast(node->children_[i]->str_len_), node->children_[i]->str_value_); + OZ (column_ref.access_idents_.push_back(ObObjAccessIdent(ident_name, OB_INVALID_INDEX)), K(column_ref)); + } + } + } + if (OB_SUCC(ret) && column_ref.access_idents_.count() >= 2) { + int64_t acc_cnt = column_ref.access_idents_.count(); + if (0 == column_ref.access_idents_.at(acc_cnt - 1).access_name_.case_compare("NEXTVAL") + || 0 == column_ref.access_idents_.at(acc_cnt - 1).access_name_.case_compare("CURRVAL")) { + ret = OB_ERR_SEQ_NOT_ALLOWED_HERE; + LOG_WARN("ORA-02287: sequence number not allowed here", K(ret), K(column_ref)); + } + } + OV (column_ref.access_idents_.count() >= 1); + OX (column_ref.dblink_name_.assign_ptr(const_cast(node->children_[3]->str_value_), + static_cast(node->children_[3]->str_len_))); + OZ (resolve_dblink_udf_expr(node->children_[4], column_ref, expr)); + return ret; +} + +int ObRawExprResolverImpl::resolve_dblink_udf_expr(const ParseNode *node, + ObQualifiedName &column_ref, + ObRawExpr *&expr) +{ + int ret = OB_SUCCESS; + ObObjAccessIdent &access_ident = column_ref.access_idents_.at(column_ref.access_idents_.count() - 1); + ObUDFInfo &udf_info = access_ident.udf_info_; + ObUDFRawExpr *func_expr = NULL; + access_ident.type_ = PL_UDF; + OX (udf_info.udf_name_.assign_ptr(access_ident.access_name_.ptr(), access_ident.access_name_.length())); + OZ (ctx_.expr_factory_.create_raw_expr(T_FUN_UDF, func_expr)); + CK (OB_NOT_NULL(func_expr)); + // resolve param list + if (OB_SUCC(ret) && NULL != node) { + bool has_assign_expr = false; + for (int64_t i = 0; OB_SUCC(ret) && i < node->num_child_; i++) { + ObRawExpr *param_expr = NULL; + const ParseNode *param_node = node->children_[i]; + CK (OB_NOT_NULL(param_node)); + if (OB_SUCC(ret) && T_SP_CPARAM == param_node->type_) { + if (T_IDENT != param_node->children_[0]->type_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid param name node", K(ret), K(param_node->children_[0]->type_)); + } else { + has_assign_expr = true; + ObString param_name(static_cast(param_node->children_[0]->str_len_), + param_node->children_[0]->str_value_); + OV (!param_name.empty(), OB_INVALID_ARGUMENT); + OZ (udf_info.param_names_.push_back(param_name)); + OZ (SMART_CALL(recursive_resolve(param_node->children_[1], param_expr))); + OZ (udf_info.param_exprs_.push_back(param_expr)); + } + } else if (has_assign_expr) { + ret = OB_ERR_POSITIONAL_FOLLOW_NAME; + LOG_WARN("a positional parameter association may not follow a named", K(ret)); + } else { + OZ (SMART_CALL(recursive_resolve(node->children_[i], param_expr))); + OX (udf_info.udf_param_num_++); + OZ (access_ident.params_.push_back(std::make_pair(param_expr, 0)), KPC(param_expr), K(i)); + OZ (func_expr->add_param_expr(access_ident.params_.at(i).first)); + } + } + } + if (OB_SUCC(ret) && NULL != ctx_.query_ctx_) { + ctx_.query_ctx_->has_udf_ = true; + for (int64_t i = 0; OB_SUCC(ret) && i < ctx_.query_ctx_->all_user_variable_.count(); i++) { + OV (OB_NOT_NULL(ctx_.query_ctx_->all_user_variable_.at(i)), OB_ERR_UNEXPECTED, i); + OX (ctx_.query_ctx_->all_user_variable_.at(i)->set_query_has_udf(true)); + } + } + if (OB_SUCC(ret) && NULL != func_expr) { + func_expr->set_func_name(udf_info.udf_name_); + udf_info.ref_expr_ = func_expr; + OZ (func_expr->extract_info(), KPC(func_expr)); + } + if (OB_SUCC(ret)) { + column_ref.format_qualified_name(ctx_.case_mode_); + column_ref.parents_expr_info_ = ctx_.parents_expr_info_; + ObColumnRefRawExpr *b_expr = NULL; + OZ (ctx_.expr_factory_.create_raw_expr(T_REF_COLUMN, b_expr)); + OV (OB_NOT_NULL(b_expr)); + OX (column_ref.ref_expr_ = b_expr); + OZ (ctx_.columns_->push_back(column_ref)); + OX (expr = b_expr); + } + return ret; +} + int ObRawExprResolverImpl::process_xml_attributes_node(const ParseNode *node, ObRawExpr *&expr) { INIT_SUCC(ret); CK(OB_NOT_NULL(node)); @@ -1873,6 +2008,7 @@ int ObRawExprResolverImpl::check_pl_variable(ObQualifiedName &q_name, bool &is_p fake_columns, fake_exprs, var, + &ctx_.secondary_namespace_->get_external_ns()->get_resolve_ctx().package_guard_, false,/*is_prepare_protocol*/ true,/*is_check_mode*/ ctx_.current_scope_ != T_PL_SCOPE /*is_sql_scope*/))) { diff --git a/src/sql/resolver/expr/ob_raw_expr_resolver_impl.h b/src/sql/resolver/expr/ob_raw_expr_resolver_impl.h index 73b35a07e2..d9ff1bd9af 100644 --- a/src/sql/resolver/expr/ob_raw_expr_resolver_impl.h +++ b/src/sql/resolver/expr/ob_raw_expr_resolver_impl.h @@ -208,6 +208,10 @@ private: int process_xmlparse_node(const ParseNode *node, ObRawExpr *&expr); void get_special_func_ident_name(ObString &ident_name, const ObItemType func_type); int process_remote_sequence_node(const ParseNode *node, ObRawExpr *&expr); + int process_dblink_udf_node(const ParseNode *node, ObRawExpr *&expr); + int resolve_dblink_udf_expr(const ParseNode *node, + ObQualifiedName &column_ref, + ObRawExpr *&expr); private: int process_sys_func_params(ObSysFunRawExpr &func_expr, int current_columns_count); diff --git a/src/sql/resolver/expr/ob_raw_expr_util.cpp b/src/sql/resolver/expr/ob_raw_expr_util.cpp index b956e321af..683fea82c2 100644 --- a/src/sql/resolver/expr/ob_raw_expr_util.cpp +++ b/src/sql/resolver/expr/ob_raw_expr_util.cpp @@ -789,7 +789,9 @@ int ObRawExprUtils::resolve_udf_common_info(const ObString &db_name, bool is_pkg_body_udf, bool is_pl_agg, int64_t type_id, - ObUDFInfo &udf_info) + ObUDFInfo &udf_info, + uint64_t dblink_id, + const ObString &dblink_name) { int ret = OB_SUCCESS; ObUDFRawExpr *udf_raw_expr = udf_info.ref_expr_; @@ -806,6 +808,8 @@ int ObRawExprUtils::resolve_udf_common_info(const ObString &db_name, OX (udf_raw_expr->set_pkg_body_udf(is_pkg_body_udf)); OX (udf_raw_expr->set_type_id(type_id)); OX (udf_raw_expr->set_is_aggregate_udf(is_pl_agg)); + OX (udf_raw_expr->set_dblink_id(dblink_id)); + OX (udf_raw_expr->set_dblink_name(dblink_name)); return ret; } @@ -814,7 +818,8 @@ int ObRawExprUtils::resolve_udf_param_types(const ObIRoutineInfo* func_info, sql::ObSQLSessionInfo &session_info, common::ObIAllocator &allocator, common::ObMySQLProxy &sql_proxy, - ObUDFInfo &udf_info) + ObUDFInfo &udf_info, + pl::ObPLDbLinkGuard &dblink_guard) { int ret = OB_SUCCESS; @@ -855,7 +860,9 @@ int ObRawExprUtils::resolve_udf_param_types(const ObIRoutineInfo* func_info, session_info, allocator, sql_proxy, - ret_pl_type)); + ret_pl_type, + NULL, + &dblink_guard)); } else { OX (ret_pl_type = ret_param->get_pl_data_type()); } @@ -901,7 +908,9 @@ int ObRawExprUtils::resolve_udf_param_types(const ObIRoutineInfo* func_info, session_info, allocator, sql_proxy, - param_pl_type)); + param_pl_type, + NULL, + &dblink_guard)); } else { OX (param_pl_type = iparam->get_pl_data_type()); } @@ -1266,21 +1275,26 @@ int ObRawExprUtils::resolve_udf_info(common::ObIAllocator &allocator, ObUDFInfo &udf_info) { int ret = OB_SUCCESS; - pl::ObPLPackageGuard dummy_pkg_guard(session_info.get_effective_tenant_id()); - pl::ObPLResolver pl_resolver(allocator, - session_info, - schema_guard, - dummy_pkg_guard, - *GCTX.sql_proxy_, - expr_factory, - NULL, - false); - HEAP_VAR(pl::ObPLFunctionAST, func_ast, allocator) { - ObSEArray access_idxs; - if (OB_FAIL(pl_resolver.init(func_ast))) { - LOG_WARN("pl resolver init failed", K(ret)); - } else if (OB_FAIL(pl_resolver.resolve_udf_info(udf_info, access_idxs, func_ast))) { - LOG_WARN("failed to resolve udf info", K(ret)); + pl::ObPLPackageGuard *package_guard = NULL; + CK (OB_NOT_NULL(session_info.get_cur_exec_ctx())); + OZ (session_info.get_cur_exec_ctx()->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + if (OB_SUCC(ret)) { + pl::ObPLResolver pl_resolver(allocator, + session_info, + schema_guard, + *package_guard, + *GCTX.sql_proxy_, + expr_factory, + NULL, + false); + HEAP_VAR(pl::ObPLFunctionAST, func_ast, allocator) { + ObSEArray access_idxs; + if (OB_FAIL(pl_resolver.init(func_ast))) { + LOG_WARN("pl resolver init failed", K(ret)); + } else if (OB_FAIL(pl_resolver.resolve_udf_info(udf_info, access_idxs, func_ast))) { + LOG_WARN("failed to resolve udf info", K(ret)); + } } } return ret; diff --git a/src/sql/resolver/expr/ob_raw_expr_util.h b/src/sql/resolver/expr/ob_raw_expr_util.h index 2f9108f2b3..9892fd2d2d 100644 --- a/src/sql/resolver/expr/ob_raw_expr_util.h +++ b/src/sql/resolver/expr/ob_raw_expr_util.h @@ -860,13 +860,16 @@ public: bool is_pkg_body_udf, bool is_pl_agg, int64_t type_id, - ObUDFInfo &udf_info); + ObUDFInfo &udf_info, + uint64_t dblink_id, + const ObString &dblink_name); static int resolve_udf_param_types(const share::schema::ObIRoutineInfo* func_info, share::schema::ObSchemaGetterGuard &schema_guard, sql::ObSQLSessionInfo &session_info, common::ObIAllocator &allocator, common::ObMySQLProxy &sql_proxy, - ObUDFInfo &udf_info); + ObUDFInfo &udf_info, + pl::ObPLDbLinkGuard &dblink_guard); static int resolve_udf_param_exprs(ObResolverParams ¶ms, const share::schema::ObIRoutineInfo *func_info, ObUDFInfo &udf_info); diff --git a/src/sql/resolver/ob_resolver_define.h b/src/sql/resolver/ob_resolver_define.h index e2339ecca7..a28b75b820 100644 --- a/src/sql/resolver/ob_resolver_define.h +++ b/src/sql/resolver/ob_resolver_define.h @@ -249,6 +249,7 @@ inline int databuff_print_key_obj(char *buf, const int64_t buf_len, int64_t &pos namespace pl { class ObPLBlockNS; +class ObPLPackageGuard; } namespace sql @@ -350,7 +351,8 @@ struct ObResolverParams need_check_col_dup_(true), is_specified_col_name_(false), is_in_sys_view_(false), - is_expanding_view_(false) + is_expanding_view_(false), + package_guard_(NULL) {} bool is_force_trace_log() { return force_trace_log_; } @@ -419,6 +421,7 @@ public: bool is_specified_col_name_;//mark if specify the column name in create view or create table as.. bool is_in_sys_view_; bool is_expanding_view_; + pl::ObPLPackageGuard *package_guard_; }; } // end namespace sql } // end namespace oceanbase diff --git a/src/sql/resolver/ob_resolver_utils.cpp b/src/sql/resolver/ob_resolver_utils.cpp index c3d5a8eb31..317909bbf9 100644 --- a/src/sql/resolver/ob_resolver_utils.cpp +++ b/src/sql/resolver/ob_resolver_utils.cpp @@ -1323,7 +1323,6 @@ int ObResolverUtils::get_routine(pl::ObPLPackageGuard &package_guard, resolve_ctx.params_.secondary_namespace_ = params.secondary_namespace_; resolve_ctx.params_.param_list_ = params.param_list_; resolve_ctx.params_.is_execute_call_stmt_ = params.is_execute_call_stmt_; - OZ (package_guard.init()); if (dblink_name.empty()) { OZ (get_routine(resolve_ctx, tenant_id, @@ -3012,6 +3011,7 @@ int ObResolverUtils::resolve_columns_for_const_expr(ObRawExpr *&expr, ObArray &columns, ObIArray &real_exprs, ObRawExpr *&expr, + pl::ObPLPackageGuard *package_guard, bool is_prepare_protocol, bool is_check_mode, bool is_sql_scope) { int ret = OB_SUCCESS; - pl::ObPLPackageGuard dummy_pkg_guard(session_info.get_effective_tenant_id()); - pl::ObPLResolver pl_resolver(allocator, - session_info, - schema_guard, - NULL == ns ? dummy_pkg_guard : ns->get_external_ns()->get_resolve_ctx().package_guard_, - NULL == sql_proxy ? (NULL == ns ? *GCTX.sql_proxy_ : ns->get_external_ns()->get_resolve_ctx().sql_proxy_) : *sql_proxy, - expr_factory, - NULL == ns ? NULL : ns->get_external_ns()->get_parent_ns(), - is_prepare_protocol, - is_check_mode, - is_sql_scope, - NULL/*param store*/, - extern_param_info); - HEAP_VAR(pl::ObPLFunctionAST, func_ast, allocator) { - if (OB_FAIL(pl_resolver.init(func_ast))) { - LOG_WARN("pl resolver init failed", K(ret)); - } else if (NULL != ns) { - pl_resolver.get_current_namespace() = *ns; - } else { /*do nothing*/ } + if (NULL == package_guard) { + CK (OB_NOT_NULL(session_info.get_cur_exec_ctx())); + OZ (session_info.get_cur_exec_ctx()->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + } + if (OB_SUCC(ret)) { + pl::ObPLResolver pl_resolver(allocator, + session_info, + schema_guard, + *package_guard, + NULL == sql_proxy ? (NULL == ns ? *GCTX.sql_proxy_ : ns->get_external_ns()->get_resolve_ctx().sql_proxy_) : *sql_proxy, + expr_factory, + NULL == ns ? NULL : ns->get_external_ns()->get_parent_ns(), + is_prepare_protocol, + is_check_mode, + is_sql_scope, + NULL/*param store*/, + extern_param_info); + HEAP_VAR(pl::ObPLFunctionAST, func_ast, allocator) { + if (OB_FAIL(pl_resolver.init(func_ast))) { + LOG_WARN("pl resolver init failed", K(ret)); + } else if (NULL != ns) { + pl_resolver.get_current_namespace() = *ns; + } else { /*do nothing*/ } - if (OB_SUCC(ret)) { - if (OB_FAIL(pl_resolver.resolve_qualified_name(q_name, columns, real_exprs, func_ast, expr))) { - if (is_check_mode) { - LOG_INFO("failed to resolve var", K(q_name), K(ret)); - } else { - LOG_WARN_IGNORE_COL_NOTFOUND(ret, "failed to resolve var", K(q_name), K(ret)); - } - } else if (OB_ISNULL(expr)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("Invalid expr", K(expr), K(ret)); - } else if (!expr->is_const_raw_expr() - && !expr->is_obj_access_expr() - && !expr->is_sys_func_expr() - && !expr->is_udf_expr() - && T_FUN_PL_GET_CURSOR_ATTR != expr->get_expr_type()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("expr type is invalid", K(expr->get_expr_type())); - } else if (OB_NOT_NULL(ns) && OB_NOT_NULL(ns->get_external_ns())) { - ObPLDependencyTable &src_dep_tbl = func_ast.get_dependency_table(); - for (int64_t i = 0; OB_SUCC(ret) && i < src_dep_tbl.count(); ++i) { - OZ (ns->get_external_ns()->add_dependency_object(src_dep_tbl.at(i))); + if (OB_SUCC(ret)) { + if (OB_FAIL(pl_resolver.resolve_qualified_name(q_name, columns, real_exprs, func_ast, expr))) { + if (is_check_mode) { + LOG_INFO("failed to resolve var", K(q_name), K(ret)); + } else { + LOG_WARN_IGNORE_COL_NOTFOUND(ret, "failed to resolve var", K(q_name), K(ret)); + } + } else if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Invalid expr", K(expr), K(ret)); + } else if (!expr->is_const_raw_expr() + && !expr->is_obj_access_expr() + && !expr->is_sys_func_expr() + && !expr->is_udf_expr() + && T_FUN_PL_GET_CURSOR_ATTR != expr->get_expr_type()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("expr type is invalid", K(expr->get_expr_type())); + } else if (OB_NOT_NULL(ns) && OB_NOT_NULL(ns->get_external_ns())) { + ObPLDependencyTable &src_dep_tbl = func_ast.get_dependency_table(); + for (int64_t i = 0; OB_SUCC(ret) && i < src_dep_tbl.count(); ++i) { + OZ (ns->get_external_ns()->add_dependency_object(src_dep_tbl.at(i))); + } } } } diff --git a/src/sql/resolver/ob_resolver_utils.h b/src/sql/resolver/ob_resolver_utils.h index 6940883ac0..29a856cefe 100644 --- a/src/sql/resolver/ob_resolver_utils.h +++ b/src/sql/resolver/ob_resolver_utils.h @@ -302,6 +302,7 @@ public: ObIArray &columns, ObIArray &real_exprs, ObRawExpr *&expr, + pl::ObPLPackageGuard *package_guard, bool is_prepare_protocol = false, bool is_check_mode = false, bool is_sql_scope = false); diff --git a/src/sql/rewrite/ob_transform_dblink.cpp b/src/sql/rewrite/ob_transform_dblink.cpp index b64ba90ae5..01ebfe1081 100644 --- a/src/sql/rewrite/ob_transform_dblink.cpp +++ b/src/sql/rewrite/ob_transform_dblink.cpp @@ -309,6 +309,8 @@ int ObTransformDBlink::inner_reverse_link_table(ObDMLStmt *stmt, uint64_t target LOG_WARN("failed to reverse link table", K(ret)); } else if (OB_FAIL(reverse_link_sequence(*stmt, target_dblink_id))) { LOG_WARN("failed to reverse link sequence", K(ret)); + } else if (OB_FAIL(reverse_link_udf(*stmt, target_dblink_id))) { + LOG_WARN("failed to reverse link udf", K(ret), K(target_dblink_id)); } else if (OB_FAIL(formalize_link_table(stmt))) { LOG_WARN("failed to formalize link table", K(ret)); } @@ -426,6 +428,32 @@ int ObTransformDBlink::reverse_link_sequence(ObDMLStmt &stmt, uint64_t target_db return ret; } +int ObTransformDBlink::reverse_link_udf(ObDMLStmt &stmt, uint64_t target_dblink_id) +{ + int ret = OB_SUCCESS; + ObSEArray udf_exprs; + if (OB_FAIL(stmt.get_udf_exprs(udf_exprs))) { + LOG_WARN("failed to get udf exprs", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < udf_exprs.count(); ++i) { + ObRawExpr *expr = udf_exprs.at(i); + if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (expr->is_udf_expr()) { + ObUDFRawExpr *udf_expr = static_cast(expr); + if (target_dblink_id != udf_expr->get_dblink_id()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("read local udf in dblink write not support", K(ret)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "read local udf in dblink write not support"); + } else { + udf_expr->set_dblink_id(OB_INVALID_ID); + } + } + } + return ret; +} + int ObTransformDBlink::reverse_link_table_for_temp_table(ObDMLStmt *root_stmt, uint64_t target_dblink_id) { int ret = OB_SUCCESS; @@ -470,6 +498,8 @@ int ObTransformDBlink::pack_link_table(ObDMLStmt *stmt, bool &trans_happened) LOG_WARN("failed to reverse link table", K(ret)); } else if (OB_FAIL(reverse_link_sequence(*stmt, is_reverse_link ? 0 : dblink_id))) { LOG_WARN("failed to reverse link sequence", K(ret)); + } else if (OB_FAIL(reverse_link_udf(*stmt, is_reverse_link ? 0 : dblink_id))) { + LOG_WARN("failed to reverse link udf", K(ret), K(is_reverse_link), K(dblink_id)); } else if (OB_FAIL(formalize_link_table(stmt))) { LOG_WARN("failed to formalize link table stmt", K(ret)); } else if (lib::is_oracle_mode() && @@ -538,8 +568,7 @@ int ObTransformDBlink::check_link_expr_valid(ObRawExpr *expr, bool &is_valid) if (OB_ISNULL(expr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null expr", K(ret)); - } else if (expr->has_flag(CNT_PL_UDF) || - expr->has_flag(CNT_SO_UDF) || + } else if (expr->has_flag(CNT_SO_UDF) || expr->has_flag(CNT_DYNAMIC_USER_VARIABLE)) { // special flag is invalid } else if (T_FUN_APPROX_COUNT_DISTINCT_SYNOPSIS == expr->get_expr_type() || @@ -549,6 +578,24 @@ int ObTransformDBlink::check_link_expr_valid(ObRawExpr *expr, bool &is_valid) // special function is invalid } else if (expr->get_result_type().is_ext()) { // special type is invalid + } else if (expr->is_udf_expr()) { + ObUDFRawExpr *udf_expr = static_cast(expr); + if (OB_ISNULL(udf_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("udf expr is NULL", K(ret)); + } else if (OB_INVALID_ID == udf_expr->get_dblink_id()) { + // local udf need calc locally + } else { + ObRawExpr *param_expr = NULL; + bool has_ext_type = false; + for (int64_t i = 0; OB_SUCC(ret) && !has_ext_type && i < udf_expr->get_param_count(); i++) { + param_expr = udf_expr->get_param_expr(i); + if (NULL != param_expr) { + has_ext_type = udf_expr->get_result_type().is_ext(); + } + } + is_valid = !has_ext_type; + } } else { is_valid = true; } @@ -644,6 +691,25 @@ int ObTransformDBlink::collect_link_table(ObDMLStmt *stmt, } } } + // check udf + if (OB_SUCC(ret) && all_table_from_one_dblink) { + ObSEArray udf_exprs; + if (OB_FAIL(stmt->get_udf_exprs(udf_exprs))) { + LOG_WARN("failed to get udf exprs", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && all_table_from_one_dblink && i < udf_exprs.count(); i++) { + ObRawExpr *expr = udf_exprs.at(i); + if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect null expr", K(ret)); + } else if (expr->is_udf_expr()) { + ObUDFRawExpr *udf_expr = static_cast(expr); + if (dblink_id != udf_expr->get_dblink_id()) { + all_table_from_one_dblink = false; + } + } + } + } //check child stmt if (OB_SUCC(ret) && all_table_from_one_dblink) { ObSEArray child_stmts; diff --git a/src/sql/rewrite/ob_transform_dblink.h b/src/sql/rewrite/ob_transform_dblink.h index a6306f5022..be50375859 100644 --- a/src/sql/rewrite/ob_transform_dblink.h +++ b/src/sql/rewrite/ob_transform_dblink.h @@ -84,6 +84,8 @@ private: int reverse_link_sequence(ObDMLStmt &stmt, uint64_t target_dblink_id); + int reverse_link_udf(ObDMLStmt &stmt, uint64_t target_dblink_id); + int reverse_link_table_for_temp_table(ObDMLStmt *root_stmt, uint64_t target_dblink_id); int pack_link_table(ObDMLStmt *stmt, bool &trans_happened); diff --git a/src/sql/rewrite/ob_transform_pre_process.cpp b/src/sql/rewrite/ob_transform_pre_process.cpp index 395dae6840..2dfc584668 100644 --- a/src/sql/rewrite/ob_transform_pre_process.cpp +++ b/src/sql/rewrite/ob_transform_pre_process.cpp @@ -9614,42 +9614,44 @@ int ObTransformPreProcess::add_constructor_to_multiset(ObDMLStmt &stmt, LOG_WARN("get null expr", K(ret)); } else { pl::ObPLINS *ns = NULL; - pl::ObPLPackageGuard package_guard(session->get_effective_tenant_id()); - pl::ObPLResolveCtx resolve_ctx(*ctx_->allocator_, - *session, - *ctx_->schema_checker_->get_schema_guard(), - package_guard, - *ctx_->exec_ctx_->get_sql_proxy(), - false); - if (OB_FAIL(package_guard.init())) { - LOG_WARN("failed to init package guard", K(ret)); - } else if (OB_ISNULL(ns = - ((NULL == session->get_pl_context()) ? - static_cast(&resolve_ctx) : - static_cast(session->get_pl_context()->get_current_ctx())))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null", K(ret)); - } else if (OB_FAIL(object_type->get_size(*ns, pl::PL_TYPE_ROW_SIZE, rowsize))) { - LOG_WARN("failed to get size", K(ret)); - } else if (OB_FAIL(object_expr->add_access_name(object_type->get_name()))) { - LOG_WARN("failed to add access name", K(ret)); - } else { - object_expr->set_rowsize(rowsize); - ObExprResType res_type; - res_type.set_type(ObExtendType); - res_type.set_extend_type(pl::PL_RECORD_TYPE); - res_type.set_udt_id(object_type->get_user_type_id()); - object_expr->set_udt_id(object_type->get_user_type_id()); - object_expr->set_result_type(res_type); - object_expr->set_func_name(object_type->get_name()); - object_expr->set_coll_schema_version(elem_info->get_schema_version()); - // Add udt info to dependency - if (object_expr->need_add_dependency()) { - ObSchemaObjVersion udt_schema_version; - if (OB_FAIL(object_expr->get_schema_object_version(udt_schema_version))) { - LOG_WARN("failed to get schema version", K(ret)); - } else if (OB_FAIL(stmt.add_global_dependency_table(udt_schema_version))) { - LOG_WARN("failed to add dependency", K(ret)); + pl::ObPLPackageGuard *package_guard = NULL; + OZ (ctx_->exec_ctx_->get_package_guard(package_guard)); + CK (OB_NOT_NULL(package_guard)); + if (OB_SUCC(ret)) { + pl::ObPLResolveCtx resolve_ctx(*ctx_->allocator_, + *session, + *ctx_->schema_checker_->get_schema_guard(), + *package_guard, + *ctx_->exec_ctx_->get_sql_proxy(), + false); + if (OB_ISNULL(ns = + ((NULL == session->get_pl_context()) ? + static_cast(&resolve_ctx) : + static_cast(session->get_pl_context()->get_current_ctx())))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (OB_FAIL(object_type->get_size(pl::PL_TYPE_ROW_SIZE, rowsize))) { + LOG_WARN("failed to get size", K(ret)); + } else if (OB_FAIL(object_expr->add_access_name(object_type->get_name()))) { + LOG_WARN("failed to add access name", K(ret)); + } else { + object_expr->set_rowsize(rowsize); + ObExprResType res_type; + res_type.set_type(ObExtendType); + res_type.set_extend_type(pl::PL_RECORD_TYPE); + res_type.set_udt_id(object_type->get_user_type_id()); + object_expr->set_udt_id(object_type->get_user_type_id()); + object_expr->set_result_type(res_type); + object_expr->set_func_name(object_type->get_name()); + object_expr->set_coll_schema_version(elem_info->get_schema_version()); + // Add udt info to dependency + if (object_expr->need_add_dependency()) { + ObSchemaObjVersion udt_schema_version; + if (OB_FAIL(object_expr->get_schema_object_version(udt_schema_version))) { + LOG_WARN("failed to get schema version", K(ret)); + } else if (OB_FAIL(stmt.add_global_dependency_table(udt_schema_version))) { + LOG_WARN("failed to add dependency", K(ret)); + } } } } diff --git a/src/sql/rewrite/ob_transform_utils.cpp b/src/sql/rewrite/ob_transform_utils.cpp index e81d44eb95..091ea25464 100644 --- a/src/sql/rewrite/ob_transform_utils.cpp +++ b/src/sql/rewrite/ob_transform_utils.cpp @@ -12193,6 +12193,24 @@ int ObTransformUtils::extract_udt_exprs(ObRawExpr *expr, ObIArray & return ret; } +int ObTransformUtils::extract_udf_exprs(ObRawExpr *expr, ObIArray &udf_exprs) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret), K(expr)); + } else if (expr->is_udf_expr() && OB_FAIL(add_var_to_array_no_dup(udf_exprs, expr))) { + LOG_WARN("failed to add var to array no dup", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < expr->get_param_count(); ++i) { + if (OB_FAIL(SMART_CALL(extract_udf_exprs(expr->get_param_expr(i), udf_exprs)))) { + LOG_WARN("failed to extract udf exprs", K(ret)); + } + } + } + return ret; +} + int ObTransformUtils::extract_json_object_exprs(ObRawExpr *expr, ObIArray &json_exprs) { int ret = OB_SUCCESS; diff --git a/src/sql/rewrite/ob_transform_utils.h b/src/sql/rewrite/ob_transform_utils.h index 8476b36c21..4524de10db 100644 --- a/src/sql/rewrite/ob_transform_utils.h +++ b/src/sql/rewrite/ob_transform_utils.h @@ -899,6 +899,9 @@ public: ObIArray &equal_conds); static int extract_udt_exprs(ObRawExpr *expr, ObIArray &udt_exprs); + + static int extract_udf_exprs(ObRawExpr *expr, ObIArray &udf_exprs); + // json object with star : json_object(*) static int check_is_json_constraint(ObTransformerCtx *ctx, ObDMLStmt *stmt,