diff --git a/src/storage/tablelock/ob_lock_func_executor.cpp b/src/storage/tablelock/ob_lock_func_executor.cpp index 490e81466d..95822a95ff 100644 --- a/src/storage/tablelock/ob_lock_func_executor.cpp +++ b/src/storage/tablelock/ob_lock_func_executor.cpp @@ -217,6 +217,8 @@ int ObLockFuncContext::open_inner_conn_() } else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("inner_conn_ or store_inner_conn_ should be null", K(ret), KP(inner_conn_), KP(store_inner_conn_)); + } else if (FALSE_IT(store_inner_conn_ = static_cast(session->get_inner_conn()))) { + } else if (FALSE_IT(session->set_inner_conn(nullptr))) { } else if (OB_FAIL(ObInnerConnectionLockUtil::create_inner_conn(session, sql_proxy_, inner_conn))) { LOG_WARN("create inner connection failed", K(ret), KPC(session)); } else if (OB_ISNULL(inner_conn)) { @@ -228,7 +230,6 @@ int ObLockFuncContext::open_inner_conn_() * so we put inner conn in session to share it within multi layer nested sql. */ inner_conn_ = inner_conn; - store_inner_conn_ = static_cast(session->get_inner_conn()); session->set_inner_conn(inner_conn); LOG_DEBUG("ObLockFuncContext::open_inner_conn_ successfully", KP(inner_conn_), @@ -243,13 +244,23 @@ int ObLockFuncContext::close_inner_conn_() int ret = OB_SUCCESS; ObSQLSessionInfo *session = my_exec_ctx_->get_my_session(); - if (OB_ISNULL(sql_proxy_) || OB_ISNULL(session) || OB_ISNULL(inner_conn_)) { + if (OB_ISNULL(sql_proxy_) || OB_ISNULL(inner_conn_)) { ret = OB_NOT_INIT; LOG_WARN("sql_proxy or inner_conn of session is NULL", K(ret), KP(sql_proxy_), KP(session), KP(inner_conn_)); } else { - session->set_inner_conn(store_inner_conn_); // restore inner_conn to session before close the tmp inner_conn OZ (sql_proxy_->close(inner_conn_, true)); } + if (OB_ISNULL(session)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("session is NULL", K(ret), KP(session)); + } else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) { + // 1. if inner_conn_ is not null, means that we have created inner_conn successfully before, so we must have already + // set store_inner_conn_ successfully, just restore it to the session. + // 2. if inner_conn_ is null, it's uncertain whether store_inner_conn_ has been set before. If store_inner_conn_ + // is not null, it must have been set. Otherwise, the inner_conn on the session may be null, or it may have existed + // with an error code before store_inner_conn_ being set. At this case, we do not set inner_conn on the session. + session->set_inner_conn(store_inner_conn_); + } sql_proxy_ = nullptr; inner_conn_ = nullptr; store_inner_conn_ = nullptr; diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.cpp b/src/storage/tablelock/ob_lock_inner_connection_util.cpp index 0601699bc7..c38f1460d7 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.cpp +++ b/src/storage/tablelock/ob_lock_inner_connection_util.cpp @@ -462,26 +462,51 @@ int ObInnerConnectionLockUtil::create_inner_conn(sql::ObSQLSessionInfo *session_ observer::ObInnerSQLConnection *&inner_conn) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObObj mysql_mode; + ObObj current_mode; + mysql_mode.set_int(0); + current_mode.set_int(-1); + observer::ObInnerSQLConnectionPool *pool = nullptr; common::sqlclient::ObISQLConnection *conn = nullptr; + lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL); if (OB_ISNULL(session_info) || OB_ISNULL(sql_proxy)) { ret = OB_NOT_INIT; LOG_WARN("session or sql_proxy is NULL", KP(session_info), KP(sql_proxy)); } else if (OB_ISNULL(pool = static_cast(sql_proxy->get_pool()))) { ret = OB_NOT_INIT; LOG_WARN("connection pool is NULL", K(ret)); + } else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) { + LOG_WARN("can not get the compat_mode", KPC(session_info)); + } else if (current_mode != mysql_mode + && OB_FAIL(session_info->update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, mysql_mode))) { + LOG_WARN("update session_info to msyql_mode failed", KR(ret), KPC(session_info)); } else if (common::sqlclient::INNER_POOL != pool->get_type()) { LOG_WARN("connection pool type is not inner", K(ret), K(pool->get_type())); // NOTICE: although we can set is_oracle_mode here, internally it will prioritize referencing // the system variables on the session, so this parameter actually has no effect } else if (OB_FAIL(pool->acquire(session_info, conn, false /* is_oracle_mode */))) { LOG_WARN("acquire connection from inner sql connection pool failed", KR(ret), KPC(session_info)); + } else if (OB_ISNULL(conn)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("acquire new connection but it's null", KR(ret), KPC(session_info)); } else { inner_conn = static_cast(conn); - // the inner_connection must be mysql_mode, so that we can write inner_table - inner_conn->set_mysql_compat_mode(); + // we must use mysql_mode connection to write inner_table + if (OB_UNLIKELY(inner_conn->is_oracle_compat_mode())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("create an oracle mode inner connection", K(ret)); + } } + + if (current_mode != mysql_mode && current_mode.get_int() != -1 && OB_NOT_NULL(session_info) + && OB_TMP_FAIL(session_info->update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) { + ret = OB_SUCCESS == ret ? tmp_ret : ret; + LOG_WARN("failed to update sys variable for compatibility mode", K(current_mode), KPC(session_info)); + } + return ret; } @@ -490,6 +515,9 @@ int ObInnerConnectionLockUtil::execute_write_sql(observer::ObInnerSQLConnection int64_t &affected_rows) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + bool need_reset_sess_mode = false; + bool need_reset_conn_mode = false; // NOTICE: This will be overwritten by the oracle_made_ field on connection, // but for safety reason, we still set up a compat_guard here. @@ -498,10 +526,17 @@ int ObInnerConnectionLockUtil::execute_write_sql(observer::ObInnerSQLConnection if (OB_ISNULL(conn)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("inner_conn is nullptr", K(ret), K(sql)); - } else if (OB_FAIL(conn->execute_write(MTL_ID(), sql.ptr(), affected_rows))) { - LOG_WARN("execute write sql failed", K(ret), K(sql)); + } else { + if (OB_FAIL(set_to_mysql_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) { + LOG_WARN("set to mysql compat_mode failed", K(ret), K(sql)); + } else if (OB_FAIL(conn->execute_write(MTL_ID(), sql.ptr(), affected_rows))) { + LOG_WARN("execute write sql failed", K(ret), K(sql)); + } + if (OB_TMP_FAIL(reset_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) { + LOG_WARN("reset compat_mode failed", K(ret), K(tmp_ret), K(sql)); + ret = COVER_SUCC(tmp_ret); + } } - LOG_DEBUG("ObInnerConnectionLockUtil::execute_write_sql", K(ret), KP(conn), K(conn->is_oracle_compat_mode())); return ret; } @@ -510,6 +545,9 @@ int ObInnerConnectionLockUtil::execute_read_sql(observer::ObInnerSQLConnection * ObISQLClient::ReadResult &res) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + bool need_reset_sess_mode = false; + bool need_reset_conn_mode = false; // NOTICE: This will be overwritten by the oracle_made_ field on connection, // but for safety reason, we still set up a compat_guard here. @@ -518,10 +556,17 @@ int ObInnerConnectionLockUtil::execute_read_sql(observer::ObInnerSQLConnection * if (OB_ISNULL(conn)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("inner_conn is nullptr", K(ret), K(sql)); - } else if (OB_FAIL(conn->execute_read(MTL_ID(), sql.ptr(), res))) { - LOG_WARN("execute read sql failed", K(ret), K(sql)); + } else { + if (OB_FAIL(set_to_mysql_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) { + LOG_WARN("set to mysql compat_mode failed", K(ret), K(sql)); + } else if (OB_FAIL(conn->execute_read(MTL_ID(), sql.ptr(), res))) { + LOG_WARN("execute read sql failed", K(ret), K(sql)); + } + if (OB_TMP_FAIL(reset_compat_mode_(conn, need_reset_sess_mode, need_reset_conn_mode))) { + LOG_WARN("reset compat_mode failed", K(ret), K(tmp_ret), K(sql)); + ret = COVER_SUCC(tmp_ret); + } } - LOG_DEBUG("ObInnerConnectionLockUtil::execute_read_sql", K(ret), KP(conn), K(conn->is_oracle_compat_mode())); return ret; } @@ -919,6 +964,45 @@ int ObInnerConnectionLockUtil::get_org_cluster_id_(ObSQLSessionInfo *session, in } return ret; } + +int ObInnerConnectionLockUtil::set_to_mysql_compat_mode_(observer::ObInnerSQLConnection *conn, bool &need_reset_sess_mode, bool &need_reset_conn_mode) +{ + int ret = OB_SUCCESS; + ObObj current_mode; + ObObj mysql_mode; + current_mode.set_int(-1); + mysql_mode.set_int(0); + need_reset_sess_mode = false; + need_reset_conn_mode = false; + + if (OB_FAIL(conn->get_session().get_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, current_mode))) { + LOG_WARN("can not get the compat_mode", ); + } else if (FALSE_IT(need_reset_sess_mode = (current_mode != mysql_mode))) { + } else if (need_reset_sess_mode + && OB_FAIL(conn->get_session().update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, mysql_mode))) { + LOG_WARN("update compat_mode to mysql_mode failed"); + } else if (conn->is_oracle_compat_mode()) { + need_reset_conn_mode = true; + conn->set_mysql_compat_mode(); + } + return ret; +} + +int ObInnerConnectionLockUtil::reset_compat_mode_(observer::ObInnerSQLConnection *conn, const bool need_reset_sess_mode, const bool need_reset_conn_mode) +{ + int ret = OB_SUCCESS; + ObObj oracle_mode; + oracle_mode.set_int(1); + + if (need_reset_sess_mode + && OB_FAIL(conn->get_session().update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, oracle_mode))) { + LOG_WARN("failed to update sys variable for compatibility mode", K(ret)); + } + if (need_reset_conn_mode) { + conn->set_oracle_compat_mode(); + } + return ret; +} #undef REQUEST_LOCK_4_1 } // tablelock } // transaction diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.h b/src/storage/tablelock/ob_lock_inner_connection_util.h index 9671626e20..6e7a09bd04 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.h +++ b/src/storage/tablelock/ob_lock_inner_connection_util.h @@ -176,6 +176,12 @@ private: const obrpc::ObInnerSQLTransmitArg::InnerSQLOperationType operation_type, observer::ObInnerSQLConnection *conn); static int get_org_cluster_id_(sql::ObSQLSessionInfo *session, int64_t &org_cluster_id); + static int set_to_mysql_compat_mode_(observer::ObInnerSQLConnection *conn, + bool &need_reset_sess_mode, + bool &need_reset_conn_mode); + static int reset_compat_mode_(observer::ObInnerSQLConnection *conn, + const bool need_reset_sess_mode, + const bool need_reset_conn_mode); }; } // tablelock