modify dbms_scheduler executor code struct
This commit is contained in:
@ -56,6 +56,172 @@ int ObDBMSSchedJobExecutor::init(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObDBMSSchedJobExecutor::init_session(
|
||||||
|
sql::ObSQLSessionInfo &session,
|
||||||
|
ObSchemaGetterGuard &schema_guard,
|
||||||
|
const ObString &tenant_name, uint64_t tenant_id,
|
||||||
|
const ObString &database_name, uint64_t database_id,
|
||||||
|
const ObUserInfo* user_info,
|
||||||
|
ObDBMSSchedJobInfo &job_info)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObArenaAllocator *allocator = NULL;
|
||||||
|
const bool print_info_log = true;
|
||||||
|
const bool is_sys_tenant = true;
|
||||||
|
ObPCMemPctConf pc_mem_conf;
|
||||||
|
ObObj compatibility_mode;
|
||||||
|
ObObj sql_mode;
|
||||||
|
if (job_info.is_oracle_tenant_) {
|
||||||
|
compatibility_mode.set_int(1);
|
||||||
|
sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
|
||||||
|
} else {
|
||||||
|
compatibility_mode.set_int(0);
|
||||||
|
sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE);
|
||||||
|
}
|
||||||
|
OX (session.set_inner_session());
|
||||||
|
OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
|
||||||
|
OZ (session.update_max_packet_size());
|
||||||
|
OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
|
||||||
|
OZ (session.load_all_sys_vars(schema_guard));
|
||||||
|
OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode));
|
||||||
|
OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode));
|
||||||
|
OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
|
||||||
|
ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
|
||||||
|
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
|
||||||
|
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
|
||||||
|
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
|
||||||
|
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
|
||||||
|
OZ (session.set_default_database(database_name));
|
||||||
|
OZ (session.get_pc_mem_conf(pc_mem_conf));
|
||||||
|
CK (OB_NOT_NULL(GCTX.sql_engine_));
|
||||||
|
OX (session.set_database_id(database_id));
|
||||||
|
OZ (session.set_user(
|
||||||
|
user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
|
||||||
|
OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
|
||||||
|
if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
|
||||||
|
// set larger timeout for mview scheduler jobs
|
||||||
|
const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
|
||||||
|
const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
|
||||||
|
ObObj query_timeout_obj;
|
||||||
|
ObObj trx_timeout_obj;
|
||||||
|
query_timeout_obj.set_int(QUERY_TIMEOUT_US);
|
||||||
|
trx_timeout_obj.set_int(TRX_TIMEOUT_US);
|
||||||
|
OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
|
||||||
|
OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDBMSSchedJobExecutor::init_env(ObDBMSSchedJobInfo &job_info, ObSQLSessionInfo &session)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObSchemaGetterGuard schema_guard;
|
||||||
|
const ObTenantSchema *tenant_info = NULL;
|
||||||
|
const ObSysVariableSchema *sys_variable_schema = NULL;
|
||||||
|
ObSEArray<const ObUserInfo *, 1> user_infos;
|
||||||
|
const ObUserInfo* user_info = NULL;
|
||||||
|
const ObDatabaseSchema *database_schema = NULL;
|
||||||
|
share::schema::ObUserLoginInfo login_info;
|
||||||
|
ObExecEnv exec_env;
|
||||||
|
CK (OB_NOT_NULL(schema_service_));
|
||||||
|
CK (job_info.valid());
|
||||||
|
OZ (schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
|
||||||
|
OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
|
||||||
|
OZ (schema_guard.get_database_schema(
|
||||||
|
job_info.get_tenant_id(), job_info.get_cowner(), database_schema));
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
if (job_info.is_oracle_tenant()) {
|
||||||
|
OZ (schema_guard.get_user_info(
|
||||||
|
job_info.get_tenant_id(), job_info.get_powner(), user_infos));
|
||||||
|
OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
|
||||||
|
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
|
||||||
|
} else {
|
||||||
|
ObString user = job_info.get_powner();
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
const char *c = user.reverse_find('@');
|
||||||
|
if (OB_ISNULL(c)) {
|
||||||
|
OZ (schema_guard.get_user_info(
|
||||||
|
job_info.get_tenant_id(), user, user_infos));
|
||||||
|
if (OB_SUCC(ret) && user_infos.count() > 1) {
|
||||||
|
OZ(ObDBMSSchedJobUtils::reserve_user_with_minimun_id(user_infos));
|
||||||
|
}
|
||||||
|
OV (1 == user_infos.count(), 0 == user_infos.count() ? OB_USER_NOT_EXIST : OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
|
||||||
|
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
|
||||||
|
} else {
|
||||||
|
ObString user_name;
|
||||||
|
ObString host_name;
|
||||||
|
user_name = user.split_on(c);
|
||||||
|
host_name = user;
|
||||||
|
OZ (schema_guard.get_user_info(
|
||||||
|
job_info.get_tenant_id(), user_name, host_name, user_info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CK (OB_NOT_NULL(user_info));
|
||||||
|
CK (OB_NOT_NULL(tenant_info));
|
||||||
|
CK (OB_NOT_NULL(database_schema));
|
||||||
|
OZ (exec_env.init(job_info.get_exec_env()));
|
||||||
|
OZ (init_session(session,
|
||||||
|
schema_guard,
|
||||||
|
tenant_info->get_tenant_name(),
|
||||||
|
job_info.get_tenant_id(),
|
||||||
|
database_schema->get_database_name(),
|
||||||
|
database_schema->get_database_id(),
|
||||||
|
user_info,
|
||||||
|
job_info));
|
||||||
|
OZ (exec_env.store(session));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDBMSSchedJobExecutor::create_session(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
ObFreeSessionCtx &free_session_ctx,
|
||||||
|
ObSQLSessionInfo *&session_info)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
|
||||||
|
uint64_t proxy_sid = 0;
|
||||||
|
if (OB_ISNULL(GCTX.session_mgr_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("session_mgr_ is null", KR(ret));
|
||||||
|
} else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
|
||||||
|
LOG_WARN("alloc session id failed", KR(ret));
|
||||||
|
} else if (OB_FAIL(GCTX.session_mgr_->create_session(
|
||||||
|
tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) {
|
||||||
|
LOG_WARN("create session failed", K(ret), K(sid));
|
||||||
|
GCTX.session_mgr_->mark_sessid_unused(sid);
|
||||||
|
session_info = NULL;
|
||||||
|
} else if (OB_ISNULL(session_info)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected session info is null", K(ret));
|
||||||
|
} else {
|
||||||
|
free_session_ctx.sessid_ = sid;
|
||||||
|
free_session_ctx.proxy_sessid_ = proxy_sid;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDBMSSchedJobExecutor::destroy_session(
|
||||||
|
ObFreeSessionCtx &free_session_ctx,
|
||||||
|
ObSQLSessionInfo *session_info)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_ISNULL(GCTX.session_mgr_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("session_mgr_ is null", KR(ret));
|
||||||
|
} else if (OB_ISNULL(session_info)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("session_info is null", KR(ret));
|
||||||
|
} else {
|
||||||
|
session_info->set_session_sleep();
|
||||||
|
GCTX.session_mgr_->revert_session(session_info);
|
||||||
|
GCTX.session_mgr_->free_session(free_session_ctx);
|
||||||
|
GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObDBMSSchedJobExecutor::run_dbms_sched_job(
|
int ObDBMSSchedJobExecutor::run_dbms_sched_job(
|
||||||
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info)
|
uint64_t tenant_id, ObDBMSSchedJobInfo &job_info)
|
||||||
{
|
{
|
||||||
@ -69,7 +235,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
|
|||||||
CK (OB_LIKELY(inited_));
|
CK (OB_LIKELY(inited_));
|
||||||
CK (OB_NOT_NULL(sql_proxy_));
|
CK (OB_NOT_NULL(sql_proxy_));
|
||||||
CK (sql_proxy_->is_inited());
|
CK (sql_proxy_->is_inited());
|
||||||
if (OB_FAIL(ObDBMSSchedJobUtils::create_session(tenant_id, free_session_ctx, session_info))) {
|
if (OB_FAIL(ObDBMSSchedJobExecutor::create_session(tenant_id, free_session_ctx, session_info))) {
|
||||||
LOG_WARN("failed to create session", KR(ret), K(tenant_id));
|
LOG_WARN("failed to create session", KR(ret), K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
CK (job_info.valid());
|
CK (job_info.valid());
|
||||||
@ -205,7 +371,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
OZ (ObDBMSSchedJobUtils::init_env(job_info, *session_info));
|
OZ (ObDBMSSchedJobExecutor::init_env(job_info, *session_info));
|
||||||
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(sql_proxy_->get_pool())));
|
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(sql_proxy_->get_pool())));
|
||||||
OZ (pool->acquire_spi_conn(session_info, conn));
|
OZ (pool->acquire_spi_conn(session_info, conn));
|
||||||
OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
|
OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
|
||||||
@ -217,7 +383,7 @@ int ObDBMSSchedJobExecutor::run_dbms_sched_job(
|
|||||||
}
|
}
|
||||||
if (NULL != session_info) {
|
if (NULL != session_info) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_TMP_FAIL(ObDBMSSchedJobUtils::destroy_session(free_session_ctx, session_info))) {
|
if (OB_TMP_FAIL(ObDBMSSchedJobExecutor::destroy_session(free_session_ctx, session_info))) {
|
||||||
LOG_WARN("failed to destroy session", KR(tmp_ret));
|
LOG_WARN("failed to destroy session", KR(tmp_ret));
|
||||||
ret = (OB_SUCC(ret)) ? tmp_ret : ret;
|
ret = (OB_SUCC(ret)) ? tmp_ret : ret;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -33,8 +33,18 @@ public:
|
|||||||
int init(
|
int init(
|
||||||
common::ObMySQLProxy *sql_proxy, share::schema::ObMultiVersionSchemaService *schema_service);
|
common::ObMySQLProxy *sql_proxy, share::schema::ObMultiVersionSchemaService *schema_service);
|
||||||
int run_dbms_sched_job(uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name);
|
int run_dbms_sched_job(uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const ObString &job_name);
|
||||||
|
int init_env(ObDBMSSchedJobInfo &job_info, sql::ObSQLSessionInfo &session);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
static int init_session(
|
||||||
|
sql::ObSQLSessionInfo &session,
|
||||||
|
share::schema::ObSchemaGetterGuard &schema_guard,
|
||||||
|
const common::ObString &tenant_name, uint64_t tenant_id,
|
||||||
|
const common::ObString &database_name, uint64_t database_id,
|
||||||
|
const share::schema::ObUserInfo* user_info,
|
||||||
|
ObDBMSSchedJobInfo &job_info);
|
||||||
|
int create_session(const uint64_t tenant_id, sql::ObFreeSessionCtx &free_session_ctx, sql::ObSQLSessionInfo *&session_info);
|
||||||
|
int destroy_session(sql::ObFreeSessionCtx &free_session_ctx, sql::ObSQLSessionInfo *session_info);
|
||||||
int run_dbms_sched_job(uint64_t tenant_id, ObDBMSSchedJobInfo &job_info);
|
int run_dbms_sched_job(uint64_t tenant_id, ObDBMSSchedJobInfo &job_info);
|
||||||
|
|
||||||
bool inited_;
|
bool inited_;
|
||||||
|
|||||||
@ -239,64 +239,6 @@ int ObDBMSSchedJobUtils::add_dbms_sched_job(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDBMSSchedJobUtils::init_session(
|
|
||||||
ObSQLSessionInfo &session,
|
|
||||||
ObSchemaGetterGuard &schema_guard,
|
|
||||||
const ObString &tenant_name,
|
|
||||||
uint64_t tenant_id,
|
|
||||||
const ObString &database_name,
|
|
||||||
uint64_t database_id,
|
|
||||||
const ObUserInfo* user_info,
|
|
||||||
const ObDBMSSchedJobInfo &job_info)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
ObArenaAllocator *allocator = NULL;
|
|
||||||
const bool print_info_log = true;
|
|
||||||
const bool is_sys_tenant = true;
|
|
||||||
ObPCMemPctConf pc_mem_conf;
|
|
||||||
ObObj compatibility_mode;
|
|
||||||
ObObj sql_mode;
|
|
||||||
if (job_info.is_oracle_tenant_) {
|
|
||||||
compatibility_mode.set_int(1);
|
|
||||||
sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
|
|
||||||
} else {
|
|
||||||
compatibility_mode.set_int(0);
|
|
||||||
sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE);
|
|
||||||
}
|
|
||||||
OX (session.set_inner_session());
|
|
||||||
OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
|
|
||||||
OZ (session.update_max_packet_size());
|
|
||||||
OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
|
|
||||||
OZ (session.load_all_sys_vars(schema_guard));
|
|
||||||
OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode));
|
|
||||||
OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode));
|
|
||||||
OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
|
|
||||||
ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
|
|
||||||
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
|
|
||||||
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
|
|
||||||
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
|
|
||||||
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
|
|
||||||
OZ (session.set_default_database(database_name));
|
|
||||||
OZ (session.get_pc_mem_conf(pc_mem_conf));
|
|
||||||
CK (OB_NOT_NULL(GCTX.sql_engine_));
|
|
||||||
OX (session.set_database_id(database_id));
|
|
||||||
OZ (session.set_user(
|
|
||||||
user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
|
|
||||||
OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
|
|
||||||
if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
|
|
||||||
// set larger timeout for mview scheduler jobs
|
|
||||||
const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
|
|
||||||
const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
|
|
||||||
ObObj query_timeout_obj;
|
|
||||||
ObObj trx_timeout_obj;
|
|
||||||
query_timeout_obj.set_int(QUERY_TIMEOUT_US);
|
|
||||||
trx_timeout_obj.set_int(TRX_TIMEOUT_US);
|
|
||||||
OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
|
|
||||||
OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray<const ObUserInfo *> &user_infos)
|
int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray<const ObUserInfo *> &user_infos)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -322,117 +264,5 @@ int ObDBMSSchedJobUtils::reserve_user_with_minimun_id(ObIArray<const ObUserInfo
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDBMSSchedJobUtils::init_env(
|
|
||||||
ObDBMSSchedJobInfo &job_info,
|
|
||||||
ObSQLSessionInfo &session)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
ObSchemaGetterGuard schema_guard;
|
|
||||||
const ObTenantSchema *tenant_info = NULL;
|
|
||||||
const ObSysVariableSchema *sys_variable_schema = NULL;
|
|
||||||
ObSEArray<const ObUserInfo *, 1> user_infos;
|
|
||||||
const ObUserInfo* user_info = NULL;
|
|
||||||
const ObDatabaseSchema *database_schema = NULL;
|
|
||||||
share::schema::ObUserLoginInfo login_info;
|
|
||||||
ObExecEnv exec_env;
|
|
||||||
CK (OB_NOT_NULL(GCTX.schema_service_));
|
|
||||||
CK (job_info.valid());
|
|
||||||
OZ (GCTX.schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
|
|
||||||
OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
|
|
||||||
OZ (schema_guard.get_database_schema(
|
|
||||||
job_info.get_tenant_id(), job_info.get_cowner(), database_schema));
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
if (job_info.is_oracle_tenant()) {
|
|
||||||
OZ (schema_guard.get_user_info(
|
|
||||||
job_info.get_tenant_id(), job_info.get_powner(), user_infos));
|
|
||||||
OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
|
|
||||||
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
|
|
||||||
} else {
|
|
||||||
ObString user = job_info.get_powner();
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
const char *c = user.reverse_find('@');
|
|
||||||
if (OB_ISNULL(c)) {
|
|
||||||
OZ (schema_guard.get_user_info(
|
|
||||||
job_info.get_tenant_id(), user, user_infos));
|
|
||||||
if (OB_SUCC(ret) && user_infos.count() > 1) {
|
|
||||||
OZ(reserve_user_with_minimun_id(user_infos));
|
|
||||||
}
|
|
||||||
OV (1 == user_infos.count(), 0 == user_infos.count() ? OB_USER_NOT_EXIST : OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
|
|
||||||
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
|
|
||||||
} else {
|
|
||||||
ObString user_name;
|
|
||||||
ObString host_name;
|
|
||||||
user_name = user.split_on(c);
|
|
||||||
host_name = user;
|
|
||||||
OZ (schema_guard.get_user_info(
|
|
||||||
job_info.get_tenant_id(), user_name, host_name, user_info));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CK (OB_NOT_NULL(user_info));
|
|
||||||
CK (OB_NOT_NULL(tenant_info));
|
|
||||||
CK (OB_NOT_NULL(database_schema));
|
|
||||||
OZ (exec_env.init(job_info.get_exec_env()));
|
|
||||||
OZ (init_session(session,
|
|
||||||
schema_guard,
|
|
||||||
tenant_info->get_tenant_name(),
|
|
||||||
job_info.get_tenant_id(),
|
|
||||||
database_schema->get_database_name(),
|
|
||||||
database_schema->get_database_id(),
|
|
||||||
user_info,
|
|
||||||
job_info));
|
|
||||||
OZ (exec_env.store(session));
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObDBMSSchedJobUtils::create_session(
|
|
||||||
const uint64_t tenant_id,
|
|
||||||
ObFreeSessionCtx &free_session_ctx,
|
|
||||||
ObSQLSessionInfo *&session_info)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
|
|
||||||
uint64_t proxy_sid = 0;
|
|
||||||
if (OB_ISNULL(GCTX.session_mgr_)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("session_mgr_ is null", KR(ret));
|
|
||||||
} else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
|
|
||||||
LOG_WARN("alloc session id failed", KR(ret));
|
|
||||||
} else if (OB_FAIL(GCTX.session_mgr_->create_session(
|
|
||||||
tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) {
|
|
||||||
LOG_WARN("create session failed", K(ret), K(sid));
|
|
||||||
GCTX.session_mgr_->mark_sessid_unused(sid);
|
|
||||||
session_info = NULL;
|
|
||||||
} else if (OB_ISNULL(session_info)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected session info is null", K(ret));
|
|
||||||
} else {
|
|
||||||
free_session_ctx.sessid_ = sid;
|
|
||||||
free_session_ctx.proxy_sessid_ = proxy_sid;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObDBMSSchedJobUtils::destroy_session(
|
|
||||||
ObFreeSessionCtx &free_session_ctx,
|
|
||||||
ObSQLSessionInfo *session_info)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_ISNULL(GCTX.session_mgr_)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("session_mgr_ is null", KR(ret));
|
|
||||||
} else if (OB_ISNULL(session_info)) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("session_info is null", KR(ret));
|
|
||||||
} else {
|
|
||||||
session_info->set_session_sleep();
|
|
||||||
GCTX.session_mgr_->revert_session(session_info);
|
|
||||||
GCTX.session_mgr_->free_session(free_session_ctx);
|
|
||||||
GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // end for namespace dbms_scheduler
|
} // end for namespace dbms_scheduler
|
||||||
} // end for namespace oceanbase
|
} // end for namespace oceanbase
|
||||||
|
|||||||
@ -279,22 +279,6 @@ public:
|
|||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t job_id,
|
const int64_t job_id,
|
||||||
const ObDBMSSchedJobInfo &job_info);
|
const ObDBMSSchedJobInfo &job_info);
|
||||||
static int init_session(
|
|
||||||
sql::ObSQLSessionInfo &session,
|
|
||||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
|
||||||
const common::ObString &tenant_name,
|
|
||||||
uint64_t tenant_id,
|
|
||||||
const common::ObString &database_name,
|
|
||||||
uint64_t database_id,
|
|
||||||
const share::schema::ObUserInfo* user_info,
|
|
||||||
const ObDBMSSchedJobInfo &job_info);
|
|
||||||
static int init_env(ObDBMSSchedJobInfo &job_info,
|
|
||||||
sql::ObSQLSessionInfo &session);
|
|
||||||
static int create_session(const uint64_t tenant_id,
|
|
||||||
sql::ObFreeSessionCtx &free_session_ctx,
|
|
||||||
sql::ObSQLSessionInfo *&session_info);
|
|
||||||
static int destroy_session(sql::ObFreeSessionCtx &free_session_ctx,
|
|
||||||
sql::ObSQLSessionInfo *session_info);
|
|
||||||
static int reserve_user_with_minimun_id(ObIArray<const ObUserInfo *> &user_infos);
|
static int reserve_user_with_minimun_id(ObIArray<const ObUserInfo *> &user_infos);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
#include "storage/mview/ob_mview_sched_job_utils.h"
|
#include "storage/mview/ob_mview_sched_job_utils.h"
|
||||||
#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
|
#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
|
||||||
|
#include "observer/dbms_scheduler/ob_dbms_sched_job_executor.h"
|
||||||
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
|
#include "observer/dbms_scheduler/ob_dbms_sched_table_operator.h"
|
||||||
#include "storage/ob_common_id_utils.h"
|
#include "storage/ob_common_id_utils.h"
|
||||||
#include "lib/ob_errno.h"
|
#include "lib/ob_errno.h"
|
||||||
@ -366,9 +367,15 @@ int ObMViewSchedJobUtils::calc_date_expression(
|
|||||||
CREATE_WITH_TEMP_CONTEXT(ctx_param) {
|
CREATE_WITH_TEMP_CONTEXT(ctx_param) {
|
||||||
ObIAllocator &tmp_allocator = CURRENT_CONTEXT->get_arena_allocator();
|
ObIAllocator &tmp_allocator = CURRENT_CONTEXT->get_arena_allocator();
|
||||||
SMART_VAR(ObSQLSessionInfo, session) {
|
SMART_VAR(ObSQLSessionInfo, session) {
|
||||||
if (OB_FAIL(session.init(1, 1, &tmp_allocator))) {
|
ObDBMSSchedJobExecutor executor;
|
||||||
|
if (OB_ISNULL(GCTX.sql_proxy_) || OB_ISNULL(GCTX.schema_service_)) {
|
||||||
|
ret = OB_INVALID_ERROR;
|
||||||
|
LOG_WARN("null ptr", K(ret), K(GCTX.sql_proxy_), K(GCTX.schema_service_));
|
||||||
|
} else if (OB_FAIL(executor.init(GCTX.sql_proxy_,GCTX.schema_service_))) {
|
||||||
|
LOG_WARN("fail to init dbms sched job executor", K(ret));
|
||||||
|
} else if (OB_FAIL(session.init(1, 1, &tmp_allocator))) {
|
||||||
LOG_WARN("failed to init session", KR(ret));
|
LOG_WARN("failed to init session", KR(ret));
|
||||||
} else if (OB_FAIL(ObDBMSSchedJobUtils::init_env(job_info, session))) {
|
} else if (OB_FAIL(executor.init_env(job_info, session))) {
|
||||||
LOG_WARN("failed to init env", KR(ret), K(job_info));
|
LOG_WARN("failed to init env", KR(ret), K(job_info));
|
||||||
} else {
|
} else {
|
||||||
bool is_oracle_mode = lib::is_oracle_mode();
|
bool is_oracle_mode = lib::is_oracle_mode();
|
||||||
|
|||||||
Reference in New Issue
Block a user