[TABLELOCK] change system_variable on session when execute inner_sql to avoid defense errors
This commit is contained in:
@ -217,6 +217,8 @@ int ObLockFuncContext::open_inner_conn_()
|
||||
} else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("inner_conn_ or store_inner_conn_ should be null", K(ret), KP(inner_conn_), KP(store_inner_conn_));
|
||||
} else if (FALSE_IT(store_inner_conn_ = static_cast<observer::ObInnerSQLConnection *>(session->get_inner_conn()))) {
|
||||
} else if (FALSE_IT(session->set_inner_conn(nullptr))) {
|
||||
} else if (OB_FAIL(ObInnerConnectionLockUtil::create_inner_conn(session, sql_proxy_, inner_conn))) {
|
||||
LOG_WARN("create inner connection failed", K(ret), KPC(session));
|
||||
} else if (OB_ISNULL(inner_conn)) {
|
||||
@ -228,7 +230,6 @@ int ObLockFuncContext::open_inner_conn_()
|
||||
* so we put inner conn in session to share it within multi layer nested sql.
|
||||
*/
|
||||
inner_conn_ = inner_conn;
|
||||
store_inner_conn_ = static_cast<observer::ObInnerSQLConnection *>(session->get_inner_conn());
|
||||
session->set_inner_conn(inner_conn);
|
||||
LOG_DEBUG("ObLockFuncContext::open_inner_conn_ successfully",
|
||||
KP(inner_conn_),
|
||||
@ -243,13 +244,23 @@ int ObLockFuncContext::close_inner_conn_()
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = my_exec_ctx_->get_my_session();
|
||||
|
||||
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(session) || OB_ISNULL(inner_conn_)) {
|
||||
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(inner_conn_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("sql_proxy or inner_conn of session is NULL", K(ret), KP(sql_proxy_), KP(session), KP(inner_conn_));
|
||||
} else {
|
||||
session->set_inner_conn(store_inner_conn_); // restore inner_conn to session before close the tmp inner_conn
|
||||
OZ (sql_proxy_->close(inner_conn_, true));
|
||||
}
|
||||
if (OB_ISNULL(session)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("session is NULL", K(ret), KP(session));
|
||||
} else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) {
|
||||
// 1. if inner_conn_ is not null, means that we have created inner_conn successfully before, so we must have already
|
||||
// set store_inner_conn_ successfully, just restore it to the session.
|
||||
// 2. if inner_conn_ is null, it's uncertain whether store_inner_conn_ has been set before. If store_inner_conn_
|
||||
// is not null, it must have been set. Otherwise, the inner_conn on the session may be null, or it may have existed
|
||||
// with an error code before store_inner_conn_ being set. At this case, we do not set inner_conn on the session.
|
||||
session->set_inner_conn(store_inner_conn_);
|
||||
}
|
||||
sql_proxy_ = nullptr;
|
||||
inner_conn_ = nullptr;
|
||||
store_inner_conn_ = nullptr;
|
||||
|
@ -462,26 +462,51 @@ int ObInnerConnectionLockUtil::create_inner_conn(sql::ObSQLSessionInfo *session_
|
||||
observer::ObInnerSQLConnection *&inner_conn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObObj mysql_mode;
|
||||
ObObj current_mode;
|
||||
mysql_mode.set_int(0);
|
||||
current_mode.set_int(-1);
|
||||
|
||||
observer::ObInnerSQLConnectionPool *pool = nullptr;
|
||||
common::sqlclient::ObISQLConnection *conn = nullptr;
|
||||
|
||||
lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL);
|
||||
if (OB_ISNULL(session_info) || OB_ISNULL(sql_proxy)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("session or sql_proxy is NULL", KP(session_info), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(pool = static_cast<observer::ObInnerSQLConnectionPool *>(sql_proxy->get_pool()))) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("connection pool is NULL", K(ret));
|
||||
} else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) {
|
||||
LOG_WARN("can not get the compat_mode", KPC(session_info));
|
||||
} else if (current_mode != mysql_mode
|
||||
&& OB_FAIL(session_info->update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, mysql_mode))) {
|
||||
LOG_WARN("update session_info to msyql_mode failed", KR(ret), KPC(session_info));
|
||||
} else if (common::sqlclient::INNER_POOL != pool->get_type()) {
|
||||
LOG_WARN("connection pool type is not inner", K(ret), K(pool->get_type()));
|
||||
// NOTICE: although we can set is_oracle_mode here, internally it will prioritize referencing
|
||||
// the system variables on the session, so this parameter actually has no effect
|
||||
} else if (OB_FAIL(pool->acquire(session_info, conn, false /* is_oracle_mode */))) {
|
||||
LOG_WARN("acquire connection from inner sql connection pool failed", KR(ret), KPC(session_info));
|
||||
} else if (OB_ISNULL(conn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("acquire new connection but it's null", KR(ret), KPC(session_info));
|
||||
} else {
|
||||
inner_conn = static_cast<observer::ObInnerSQLConnection *>(conn);
|
||||
// the inner_connection must be mysql_mode, so that we can write inner_table
|
||||
inner_conn->set_mysql_compat_mode();
|
||||
// we must use mysql_mode connection to write inner_table
|
||||
if (OB_UNLIKELY(inner_conn->is_oracle_compat_mode())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("create an oracle mode inner connection", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (current_mode != mysql_mode && current_mode.get_int() != -1 && OB_NOT_NULL(session_info)
|
||||
&& OB_TMP_FAIL(session_info->update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) {
|
||||
ret = OB_SUCCESS == ret ? tmp_ret : ret;
|
||||
LOG_WARN("failed to update sys variable for compatibility mode", K(current_mode), KPC(session_info));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -490,6 +515,9 @@ int ObInnerConnectionLockUtil::execute_write_sql(observer::ObInnerSQLConnection
|
||||
int64_t &affected_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool need_reset_sess_mode = false;
|
||||
bool need_reset_conn_mode = false;
|
||||
|
||||
// NOTICE: This will be overwritten by the oracle_made_ field on connection,
|
||||
// but for safety reason, we still set up a compat_guard here.
|
||||
@ -498,10 +526,17 @@ int ObInnerConnectionLockUtil::execute_write_sql(observer::ObInnerSQLConnection
|
||||
if (OB_ISNULL(conn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("inner_conn is nullptr", K(ret), K(sql));
|
||||
} else if (OB_FAIL(conn->execute_write(MTL_ID(), sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("execute write sql failed", K(ret), K(sql));
|
||||
} else {
|
||||
if (OB_FAIL(set_to_mysql_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) {
|
||||
LOG_WARN("set to mysql compat_mode failed", K(ret), K(sql));
|
||||
} else if (OB_FAIL(conn->execute_write(MTL_ID(), sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("execute write sql failed", K(ret), K(sql));
|
||||
}
|
||||
if (OB_TMP_FAIL(reset_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) {
|
||||
LOG_WARN("reset compat_mode failed", K(ret), K(tmp_ret), K(sql));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("ObInnerConnectionLockUtil::execute_write_sql", K(ret), KP(conn), K(conn->is_oracle_compat_mode()));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -510,6 +545,9 @@ int ObInnerConnectionLockUtil::execute_read_sql(observer::ObInnerSQLConnection *
|
||||
ObISQLClient::ReadResult &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool need_reset_sess_mode = false;
|
||||
bool need_reset_conn_mode = false;
|
||||
|
||||
// NOTICE: This will be overwritten by the oracle_made_ field on connection,
|
||||
// but for safety reason, we still set up a compat_guard here.
|
||||
@ -518,10 +556,17 @@ int ObInnerConnectionLockUtil::execute_read_sql(observer::ObInnerSQLConnection *
|
||||
if (OB_ISNULL(conn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("inner_conn is nullptr", K(ret), K(sql));
|
||||
} else if (OB_FAIL(conn->execute_read(MTL_ID(), sql.ptr(), res))) {
|
||||
LOG_WARN("execute read sql failed", K(ret), K(sql));
|
||||
} else {
|
||||
if (OB_FAIL(set_to_mysql_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) {
|
||||
LOG_WARN("set to mysql compat_mode failed", K(ret), K(sql));
|
||||
} else if (OB_FAIL(conn->execute_read(MTL_ID(), sql.ptr(), res))) {
|
||||
LOG_WARN("execute read sql failed", K(ret), K(sql));
|
||||
}
|
||||
if (OB_TMP_FAIL(reset_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) {
|
||||
LOG_WARN("reset compat_mode failed", K(ret), K(tmp_ret), K(sql));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("ObInnerConnectionLockUtil::execute_read_sql", K(ret), KP(conn), K(conn->is_oracle_compat_mode()));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -919,6 +964,45 @@ int ObInnerConnectionLockUtil::get_org_cluster_id_(ObSQLSessionInfo *session, in
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::set_to_mysql_compat_mode_(observer::ObInnerSQLConnection *conn, bool &need_reset_sess_mode, bool &need_reset_conn_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObObj current_mode;
|
||||
ObObj mysql_mode;
|
||||
current_mode.set_int(-1);
|
||||
mysql_mode.set_int(0);
|
||||
need_reset_sess_mode = false;
|
||||
need_reset_conn_mode = false;
|
||||
|
||||
if (OB_FAIL(conn->get_session().get_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) {
|
||||
LOG_WARN("can not get the compat_mode", );
|
||||
} else if (FALSE_IT(need_reset_sess_mode = (current_mode != mysql_mode))) {
|
||||
} else if (need_reset_sess_mode
|
||||
&& OB_FAIL(conn->get_session().update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, mysql_mode))) {
|
||||
LOG_WARN("update compat_mode to mysql_mode failed");
|
||||
} else if (conn->is_oracle_compat_mode()) {
|
||||
need_reset_conn_mode = true;
|
||||
conn->set_mysql_compat_mode();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::reset_compat_mode_(observer::ObInnerSQLConnection *conn, const bool need_reset_sess_mode, const bool need_reset_conn_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObObj oracle_mode;
|
||||
oracle_mode.set_int(1);
|
||||
|
||||
if (need_reset_sess_mode
|
||||
&& OB_FAIL(conn->get_session().update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, oracle_mode))) {
|
||||
LOG_WARN("failed to update sys variable for compatibility mode", K(ret));
|
||||
}
|
||||
if (need_reset_conn_mode) {
|
||||
conn->set_oracle_compat_mode();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
#undef REQUEST_LOCK_4_1
|
||||
} // tablelock
|
||||
} // transaction
|
||||
|
@ -176,6 +176,12 @@ private:
|
||||
const obrpc::ObInnerSQLTransmitArg::InnerSQLOperationType operation_type,
|
||||
observer::ObInnerSQLConnection *conn);
|
||||
static int get_org_cluster_id_(sql::ObSQLSessionInfo *session, int64_t &org_cluster_id);
|
||||
static int set_to_mysql_compat_mode_(observer::ObInnerSQLConnection *conn,
|
||||
bool &need_reset_sess_mode,
|
||||
bool &need_reset_conn_mode);
|
||||
static int reset_compat_mode_(observer::ObInnerSQLConnection *conn,
|
||||
const bool need_reset_sess_mode,
|
||||
const bool need_reset_conn_mode);
|
||||
};
|
||||
|
||||
} // tablelock
|
||||
|
Reference in New Issue
Block a user