From a98352eacfb01252ed1cd6d640e9cb87ea3b7ad8 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 17 Jun 2024 18:26:59 +0000 Subject: [PATCH] modify dbms_scheduler executor code struct --- .../ob_dbms_sched_job_executor.cpp | 172 +++++++++++++++++- .../ob_dbms_sched_job_executor.h | 10 + .../ob_dbms_sched_job_utils.cpp | 170 ----------------- .../dbms_scheduler/ob_dbms_sched_job_utils.h | 16 -- .../mview/ob_mview_sched_job_utils.cpp | 11 +- 5 files changed, 188 insertions(+), 191 deletions(-) diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp index 9cbd12a247..f9a464d410 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.cpp @@ -56,6 +56,172 @@ int ObDBMSSchedJobExecutor::init( return ret; } +int ObDBMSSchedJobExecutor::init_session( + sql::ObSQLSessionInfo &session, + ObSchemaGetterGuard &schema_guard, + const ObString &tenant_name, uint64_t tenant_id, + const ObString &database_name, uint64_t database_id, + const ObUserInfo* user_info, + ObDBMSSchedJobInfo &job_info) +{ + int ret = OB_SUCCESS; + ObArenaAllocator *allocator = NULL; + const bool print_info_log = true; + const bool is_sys_tenant = true; + ObPCMemPctConf pc_mem_conf; + ObObj compatibility_mode; + ObObj sql_mode; + if (job_info.is_oracle_tenant_) { + compatibility_mode.set_int(1); + sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE); + } else { + compatibility_mode.set_int(0); + sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE); + } + OX (session.set_inner_session()); + OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant)); + OZ (session.update_max_packet_size()); + OZ (session.init_tenant(tenant_name.ptr(), tenant_id)); + OZ (session.load_all_sys_vars(schema_guard)); + OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode)); + OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode)); + OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT, + ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT)); + OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT, + ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT)); + OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT, + ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT)); + OZ (session.set_default_database(database_name)); + OZ (session.get_pc_mem_conf(pc_mem_conf)); + CK (OB_NOT_NULL(GCTX.sql_engine_)); + OX (session.set_database_id(database_id)); + OZ (session.set_user( + user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id())); + OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT)); + if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) { + // set larger timeout for mview scheduler jobs + const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours + const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours + ObObj query_timeout_obj; + ObObj trx_timeout_obj; + query_timeout_obj.set_int(QUERY_TIMEOUT_US); + trx_timeout_obj.set_int(TRX_TIMEOUT_US); + OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj)); + OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj)); + } + return ret; +} + +int ObDBMSSchedJobExecutor::init_env(ObDBMSSchedJobInfo &job_info, ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const ObTenantSchema *tenant_info = NULL; + const ObSysVariableSchema *sys_variable_schema = NULL; + ObSEArray user_infos; + const ObUserInfo* user_info = NULL; + const ObDatabaseSchema *database_schema = NULL; + share::schema::ObUserLoginInfo login_info; + ObExecEnv exec_env; + CK (OB_NOT_NULL(schema_service_)); + CK (job_info.valid()); + OZ (schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard)); + OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info)); + OZ (schema_guard.get_database_schema( + job_info.get_tenant_id(), job_info.get_cowner(), database_schema)); + if (OB_SUCC(ret)) { + if (job_info.is_oracle_tenant()) { + OZ (schema_guard.get_user_info( + job_info.get_tenant_id(), job_info.get_powner(), user_infos)); + OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos)); + CK (OB_NOT_NULL(user_info = user_infos.at(0))); + } else { + ObString user = job_info.get_powner(); + if (OB_SUCC(ret)) { + const char *c = user.reverse_find('@'); + if (OB_ISNULL(c)) { + OZ (schema_guard.get_user_info( + job_info.get_tenant_id(), user, user_infos)); + if (OB_SUCC(ret) && user_infos.count() > 1) { + OZ(ObDBMSSchedJobUtils::reserve_user_with_minimun_id(user_infos)); + } + OV (1 == user_infos.count(), 0 == user_infos.count() ? OB_USER_NOT_EXIST : OB_ERR_UNEXPECTED, K(job_info), K(user_infos)); + CK (OB_NOT_NULL(user_info = user_infos.at(0))); + } else { + ObString user_name; + ObString host_name; + user_name = user.split_on(c); + host_name = user; + OZ (schema_guard.get_user_info( + job_info.get_tenant_id(), user_name, host_name, user_info)); + } + } + } + CK (OB_NOT_NULL(user_info)); + CK (OB_NOT_NULL(tenant_info)); + CK (OB_NOT_NULL(database_schema)); + OZ (exec_env.init(job_info.get_exec_env())); + OZ (init_session(session, + schema_guard, + tenant_info->get_tenant_name(), + job_info.get_tenant_id(), + database_schema->get_database_name(), + database_schema->get_database_id(), + user_info, + job_info)); + OZ (exec_env.store(session)); + } + return ret; +} + +int ObDBMSSchedJobExecutor::create_session( + const uint64_t tenant_id, + ObFreeSessionCtx &free_session_ctx, + ObSQLSessionInfo *&session_info) +{ + int ret = OB_SUCCESS; + uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID; + uint64_t proxy_sid = 0; + if (OB_ISNULL(GCTX.session_mgr_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session_mgr_ is null", KR(ret)); + } else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) { + LOG_WARN("alloc session id failed", KR(ret)); + } else if (OB_FAIL(GCTX.session_mgr_->create_session( + tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) { + LOG_WARN("create session failed", K(ret), K(sid)); + GCTX.session_mgr_->mark_sessid_unused(sid); + session_info = NULL; + } else if (OB_ISNULL(session_info)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected session info is null", K(ret)); + } else { + free_session_ctx.sessid_ = sid; + free_session_ctx.proxy_sessid_ = proxy_sid; + } + return ret; +} + +int ObDBMSSchedJobExecutor::destroy_session( + ObFreeSessionCtx &free_session_ctx, + ObSQLSessionInfo *session_info) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(GCTX.session_mgr_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session_mgr_ is null", KR(ret)); + } else if (OB_ISNULL(session_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("session_info is null", KR(ret)); + } else { + session_info->set_session_sleep(); + GCTX.session_mgr_->revert_session(session_info); + GCTX.session_mgr_->free_session(free_session_ctx); + GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_); + } + return ret; +} + int ObDBMSSchedJobExecutor::run_dbms_sched_job( uint64_t tenant_id, ObDBMSSchedJobInfo &job_info) { @@ -69,7 +235,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job( CK (OB_LIKELY(inited_)); CK (OB_NOT_NULL(sql_proxy_)); CK (sql_proxy_->is_inited()); - if (OB_FAIL(ObDBMSSchedJobUtils::create_session(tenant_id, free_session_ctx, session_info))) { + if (OB_FAIL(ObDBMSSchedJobExecutor::create_session(tenant_id, free_session_ctx, session_info))) { LOG_WARN("failed to create session", KR(ret), K(tenant_id)); } else { CK (job_info.valid()); @@ -205,7 +371,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job( } } if (OB_SUCC(ret)) { - OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info)); + OZ (ObDBMSSchedJobExecutor::init_env(job_info, *session_info)); CK (OB_NOT_NULL(pool = static_cast(sql_proxy_->get_pool()))); OZ (pool->acquire_spi_conn(session_info, conn)); OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows)); @@ -217,7 +383,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job( } if (NULL != session_info) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObDBMSSchedJobUtils::destroy_session(free_session_ctx, session_info))) { + if (OB_TMP_FAIL(ObDBMSSchedJobExecutor::destroy_session(free_session_ctx, session_info))) { LOG_WARN("failed to destroy session", KR(tmp_ret)); ret = (OB_SUCC(ret)) ? tmp_ret : ret; } else { diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.h b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.h index d4f47cbf9c..a05f63c81a 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_executor.h @@ -33,8 +33,18 @@ public: int init( common::ObMySQLProxy *sql_proxy, share::schema::ObMultiVersionSchemaService *schema_service); int run_dbms_sched_job(uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name); + int init_env(ObDBMSSchedJobInfo &job_info, sql::ObSQLSessionInfo &session); private: + static int init_session( + sql::ObSQLSessionInfo &session, + share::schema::ObSchemaGetterGuard &schema_guard, + const common::ObString &tenant_name, uint64_t tenant_id, + const common::ObString &database_name, uint64_t database_id, + const share::schema::ObUserInfo* user_info, + ObDBMSSchedJobInfo &job_info); + int create_session(const uint64_t tenant_id, sql::ObFreeSessionCtx &free_session_ctx, sql::ObSQLSessionInfo *&session_info); + int destroy_session(sql::ObFreeSessionCtx &free_session_ctx, sql::ObSQLSessionInfo *session_info); int run_dbms_sched_job(uint64_t tenant_id, ObDBMSSchedJobInfo &job_info); bool inited_; diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.cpp b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.cpp index 76a24fa50d..3e958c3ece 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.cpp +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.cpp @@ -239,64 +239,6 @@ int ObDBMSSchedJobUtils::add_dbms_sched_job( return ret; } -int ObDBMSSchedJobUtils::init_session( - ObSQLSessionInfo &session, - ObSchemaGetterGuard &schema_guard, - const ObString &tenant_name, - uint64_t tenant_id, - const ObString &database_name, - uint64_t database_id, - const ObUserInfo* user_info, - const ObDBMSSchedJobInfo &job_info) -{ - int ret = OB_SUCCESS; - ObArenaAllocator *allocator = NULL; - const bool print_info_log = true; - const bool is_sys_tenant = true; - ObPCMemPctConf pc_mem_conf; - ObObj compatibility_mode; - ObObj sql_mode; - if (job_info.is_oracle_tenant_) { - compatibility_mode.set_int(1); - sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE); - } else { - compatibility_mode.set_int(0); - sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE); - } - OX (session.set_inner_session()); - OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant)); - OZ (session.update_max_packet_size()); - OZ (session.init_tenant(tenant_name.ptr(), tenant_id)); - OZ (session.load_all_sys_vars(schema_guard)); - OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode)); - OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode)); - OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT)); - OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT)); - OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT)); - OZ (session.set_default_database(database_name)); - OZ (session.get_pc_mem_conf(pc_mem_conf)); - CK (OB_NOT_NULL(GCTX.sql_engine_)); - OX (session.set_database_id(database_id)); - OZ (session.set_user( - user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id())); - OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT)); - if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) { - // set larger timeout for mview scheduler jobs - const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours - const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours - ObObj query_timeout_obj; - ObObj trx_timeout_obj; - query_timeout_obj.set_int(QUERY_TIMEOUT_US); - trx_timeout_obj.set_int(TRX_TIMEOUT_US); - OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj)); - OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj)); - } - return ret; -} - int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray &user_infos) { int ret = OB_SUCCESS; @@ -322,117 +264,5 @@ int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray user_infos; - const ObUserInfo* user_info = NULL; - const ObDatabaseSchema *database_schema = NULL; - share::schema::ObUserLoginInfo login_info; - ObExecEnv exec_env; - CK (OB_NOT_NULL(GCTX.schema_service_)); - CK (job_info.valid()); - OZ (GCTX.schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard)); - OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info)); - OZ (schema_guard.get_database_schema( - job_info.get_tenant_id(), job_info.get_cowner(), database_schema)); - if (OB_SUCC(ret)) { - if (job_info.is_oracle_tenant()) { - OZ (schema_guard.get_user_info( - job_info.get_tenant_id(), job_info.get_powner(), user_infos)); - OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos)); - CK (OB_NOT_NULL(user_info = user_infos.at(0))); - } else { - ObString user = job_info.get_powner(); - if (OB_SUCC(ret)) { - const char *c = user.reverse_find('@'); - if (OB_ISNULL(c)) { - OZ (schema_guard.get_user_info( - job_info.get_tenant_id(), user, user_infos)); - if (OB_SUCC(ret) && user_infos.count() > 1) { - OZ(reserve_user_with_minimun_id(user_infos)); - } - OV (1 == user_infos.count(), 0 == user_infos.count() ? OB_USER_NOT_EXIST : OB_ERR_UNEXPECTED, K(job_info), K(user_infos)); - CK (OB_NOT_NULL(user_info = user_infos.at(0))); - } else { - ObString user_name; - ObString host_name; - user_name = user.split_on(c); - host_name = user; - OZ (schema_guard.get_user_info( - job_info.get_tenant_id(), user_name, host_name, user_info)); - } - } - } - CK (OB_NOT_NULL(user_info)); - CK (OB_NOT_NULL(tenant_info)); - CK (OB_NOT_NULL(database_schema)); - OZ (exec_env.init(job_info.get_exec_env())); - OZ (init_session(session, - schema_guard, - tenant_info->get_tenant_name(), - job_info.get_tenant_id(), - database_schema->get_database_name(), - database_schema->get_database_id(), - user_info, - job_info)); - OZ (exec_env.store(session)); - } - return ret; -} - -int ObDBMSSchedJobUtils::create_session( - const uint64_t tenant_id, - ObFreeSessionCtx &free_session_ctx, - ObSQLSessionInfo *&session_info) -{ - int ret = OB_SUCCESS; - uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID; - uint64_t proxy_sid = 0; - if (OB_ISNULL(GCTX.session_mgr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session_mgr_ is null", KR(ret)); - } else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) { - LOG_WARN("alloc session id failed", KR(ret)); - } else if (OB_FAIL(GCTX.session_mgr_->create_session( - tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) { - LOG_WARN("create session failed", K(ret), K(sid)); - GCTX.session_mgr_->mark_sessid_unused(sid); - session_info = NULL; - } else if (OB_ISNULL(session_info)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected session info is null", K(ret)); - } else { - free_session_ctx.sessid_ = sid; - free_session_ctx.proxy_sessid_ = proxy_sid; - } - return ret; -} - -int ObDBMSSchedJobUtils::destroy_session( - ObFreeSessionCtx &free_session_ctx, - ObSQLSessionInfo *session_info) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(GCTX.session_mgr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session_mgr_ is null", KR(ret)); - } else if (OB_ISNULL(session_info)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("session_info is null", KR(ret)); - } else { - session_info->set_session_sleep(); - GCTX.session_mgr_->revert_session(session_info); - GCTX.session_mgr_->free_session(free_session_ctx); - GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_); - } - return ret; -} - } // end for namespace dbms_scheduler } // end for namespace oceanbase diff --git a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h index d5cf8a940b..985849884a 100644 --- a/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h +++ b/src/observer/dbms_scheduler/ob_dbms_sched_job_utils.h @@ -279,22 +279,6 @@ public: const uint64_t tenant_id, const int64_t job_id, const ObDBMSSchedJobInfo &job_info); - static int init_session( - sql::ObSQLSessionInfo &session, - share::schema::ObSchemaGetterGuard &schema_guard, - const common::ObString &tenant_name, - uint64_t tenant_id, - const common::ObString &database_name, - uint64_t database_id, - const share::schema::ObUserInfo* user_info, - const ObDBMSSchedJobInfo &job_info); - static int init_env(ObDBMSSchedJobInfo &job_info, - sql::ObSQLSessionInfo &session); - static int create_session(const uint64_t tenant_id, - sql::ObFreeSessionCtx &free_session_ctx, - sql::ObSQLSessionInfo *&session_info); - static int destroy_session(sql::ObFreeSessionCtx &free_session_ctx, - sql::ObSQLSessionInfo *session_info); static int reserve_user_with_minimun_id(ObIArray &user_infos); }; } diff --git a/src/storage/mview/ob_mview_sched_job_utils.cpp b/src/storage/mview/ob_mview_sched_job_utils.cpp index 907a98e7c9..77c3d276aa 100644 --- a/src/storage/mview/ob_mview_sched_job_utils.cpp +++ b/src/storage/mview/ob_mview_sched_job_utils.cpp @@ -14,6 +14,7 @@ #include "storage/mview/ob_mview_sched_job_utils.h" #include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h" +#include "observer/dbms_scheduler/ob_dbms_sched_job_executor.h" #include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h" #include "storage/ob_common_id_utils.h" #include "lib/ob_errno.h" @@ -366,9 +367,15 @@ int ObMViewSchedJobUtils::calc_date_expression( CREATE_WITH_TEMP_CONTEXT(ctx_param) { ObIAllocator &tmp_allocator = CURRENT_CONTEXT->get_arena_allocator(); SMART_VAR(ObSQLSessionInfo, session) { - if (OB_FAIL(session.init(1, 1, &tmp_allocator))) { + ObDBMSSchedJobExecutor executor; + if (OB_ISNULL(GCTX.sql_proxy_) || OB_ISNULL(GCTX.schema_service_)) { + ret = OB_INVALID_ERROR; + LOG_WARN("null ptr", K(ret), K(GCTX.sql_proxy_), K(GCTX.schema_service_)); + } else if (OB_FAIL(executor.init(GCTX.sql_proxy_,GCTX.schema_service_))) { + LOG_WARN("fail to init dbms sched job executor", K(ret)); + } else if (OB_FAIL(session.init(1, 1, &tmp_allocator))) { LOG_WARN("failed to init session", KR(ret)); - } else if (OB_FAIL(ObDBMSSchedJobUtils::init_env(job_info, session))) { + } else if (OB_FAIL(executor.init_env(job_info, session))) { LOG_WARN("failed to init env", KR(ret), K(job_info)); } else { bool is_oracle_mode = lib::is_oracle_mode();