Optimize the method of restore proxy to create connection pool

This commit is contained in:
obdev
2023-09-12 04:40:43 +00:00
committed by ob-robot
parent 2b3d63338e
commit 54c00ceedf
2 changed files with 82 additions and 45 deletions

View File

@ -29,6 +29,7 @@
#include "share/ob_thread_mgr.h"
#include "logservice/palf/palf_options.h"
#include "share/oracle_errno.h"
#include <mysql.h>
namespace oceanbase
{
@ -206,7 +207,8 @@ ObLogRestoreProxyUtil::ObLogRestoreProxyUtil() :
user_name_(),
user_password_(),
db_name_(),
sql_proxy_()
sql_proxy_(),
is_oracle_mode_(false)
{}
ObLogRestoreProxyUtil::~ObLogRestoreProxyUtil()
@ -282,6 +284,7 @@ void ObLogRestoreProxyUtil::destroy()
user_name_.reset();
user_password_.reset();
db_name_.reset();
is_oracle_mode_ = false;
}
int ObLogRestoreProxyUtil::refresh_conn(const common::ObIArray<common::ObAddr> &addr_array,
@ -312,53 +315,20 @@ int ObLogRestoreProxyUtil::try_init(const uint64_t tenant_id,
const char *user_password)
{
int ret = OB_SUCCESS;
const char *MYSQL_DB = "OCEANBASE";
const char *ORACLE_DB = "SYS";
const char *db_name = nullptr;
ObLogRestoreMySQLProvider tmp_server_prover;
if (OB_SUCC(init(tenant_id, server_list, user_name, user_password, MYSQL_DB))) {
RESTORE_RETRY(
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
ObSqlString sql;
if (OB_FAIL(sql.assign_fmt("SELECT 1"))) {
LOG_WARN("fail to generate sql");
} else if (OB_FAIL(sql_proxy_.read(result, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(sql));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query result is null", K(tenant_id), K(sql));
} else if (OB_FAIL(result.get_result()->next())) {
LOG_WARN("get result next failed", K(sql));
} else {
LOG_INFO("proxy connect to primary oceanabse db success");
}
}
)
if (OB_FAIL(tmp_server_prover.init(server_list))) {
LOG_WARN("server_prover_ init failed", K(tenant_id), K(server_list));
} else if (OB_FAIL(detect_tenant_mode_(&tmp_server_prover, user_name, user_password))) {
LOG_WARN("detect tenant mode failed", KR(ret), K_(user_name));
} else if (OB_FALSE_IT(db_name = is_oracle_mode_ ? OB_ORA_SYS_SCHEMA_NAME : OB_SYS_DATABASE_NAME)) {
} else if (OB_FAIL(init(tenant_id, server_list, user_name, user_password, db_name))) {
LOG_WARN("[RESTORE PROXY] fail to init restore proxy");
}
if (OB_FAIL(ret)) {
LOG_WARN("proxy connect to primary oceanbase db failed, then try connect to sys db");
(void)destroy();
if (OB_SUCC(init(tenant_id, server_list, user_name, user_password, ORACLE_DB))) {
RESTORE_RETRY(
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
ObSqlString sql;
if (OB_FAIL(sql.assign_fmt("SELECT 1 FROM DUAL"))) {
LOG_WARN("fail to generate sql");
} else if (OB_FAIL(sql_proxy_.read(result, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(sql));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query result is null", K(tenant_id), K(sql));
} else if (OB_FAIL(result.get_result()->next())) {
LOG_WARN("get result next failed", K(sql));
} else {
LOG_INFO("proxy connect to sys db success");
}
}
)
}
}
if (OB_FAIL(ret)) {
LOG_WARN("proxy connect to primary db failed");
LOG_WARN("[RESTORE PROXY] proxy connect to primary db failed");
RESTORE_PROXY_USER_ERROR("connection");
destroy();
}
@ -740,5 +710,70 @@ void ObLogRestoreProxyUtil::destroy_tg_()
LOG_INFO("destroy_tg_ succ", K(origin_tg_id));
}
}
int ObLogRestoreProxyUtil::detect_tenant_mode_(common::sqlclient::ObMySQLServerProvider *server_provider,
const char *user_name,
const char *user_password)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(server_provider)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("[RESTORE PROXY] invalid server_provider", KR(ret));
} else {
const int64_t svr_cnt = server_provider->get_server_count();
if (OB_UNLIKELY(0 >= svr_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("[RESTORE PROXY] expect valid svr_cnt", K(svr_cnt));
} else {
bool detect_succ = false;
for (int idx = 0; OB_SUCC(ret) && ! detect_succ && idx < svr_cnt; idx++) {
MYSQL *mysql;
common::ObAddr server;
if (OB_FAIL(server_provider->get_server(idx, server))) {
LOG_WARN("[RESTORE PROXY] failed to get server", KR(ret), K(idx), K(svr_cnt));
} else {
const static int MAX_IP_BUFFER_LEN = 32;
char host[MAX_IP_BUFFER_LEN] = { 0 };
const char *default_db_name = "";
int32_t port = server.get_port();
if (!server.ip_to_string(host, MAX_IP_BUFFER_LEN)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("fail to get host.", K(server), K(ret));
} else if (NULL == (mysql = mysql_init(NULL))) {
LOG_ERROR("mysql_init fail", KR(ret));
ret = OB_ERR_UNEXPECTED;
} else {
if (mysql != mysql_real_connect(mysql, host, user_name, user_password, default_db_name, port, NULL, 0)) {
LOG_WARN("mysql connect failed", "mysql_error", mysql_error(mysql), K(host), K(port));
ret = OB_NEED_RETRY;
} else {
#ifdef OB_BUILD_ORACLE_PARSER
is_oracle_mode_ = mysql->oracle_mode;
#else
is_oracle_mode_ = false;
#endif
detect_succ = true;
LOG_INFO("[RESTORE PROXY][DETECT_TENANT_MODE] detect tenant mode success", KR(ret));
}
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(mysql)) {
mysql_close(mysql);
mysql = NULL;
}
}
if (OB_UNLIKELY(!detect_succ)) {
ret = OB_CONNECT_ERROR;
LOG_WARN("[RESTORE PROXY][DETECT_TENANT_MODE] connect to all server in tenant endpoint list failed", KR(ret), K(svr_cnt));
}
}
}
return ret;
}
} // namespace share
} // namespace oceanbase

View File

@ -153,6 +153,7 @@ public:
private:
bool is_user_changed_(const char *user_name, const char *user_password, const char *db_name);
void destroy_tg_();
int detect_tenant_mode_(common::sqlclient::ObMySQLServerProvider *server_provider, const char *user_name, const char *user_password);
private:
int construct_server_ip_list(const common::ObSqlString &sql, common::ObIArray<common::ObAddr> &addrs);
bool inited_;
@ -164,6 +165,7 @@ private:
common::ObFixedLengthString<common::OB_MAX_PASSWORD_LENGTH + 1> user_password_;
common::ObFixedLengthString<common::OB_MAX_DATABASE_NAME_BUF_LENGTH> db_name_;
common::ObMySQLProxy sql_proxy_;
bool is_oracle_mode_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreProxyUtil);