Optimize the calling method of proxy in recovery ls service

This commit is contained in:
obdev
2023-06-06 11:12:39 +00:00
committed by ob-robot
parent f8ddffa9b9
commit 7eae7f33bb
2 changed files with 100 additions and 86 deletions

View File

@ -43,7 +43,6 @@
#include "storage/tx/ob_tx_log.h" //ObTxLogHeader
#include "storage/tx_storage/ob_ls_service.h" //ObLSService
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil
#include "src/rootserver/ob_rs_event_history_table_operator.h"
namespace oceanbase
@ -56,6 +55,21 @@ using namespace palf;
namespace rootserver
{
#define RESTORE_EVENT_ADD \
int ret_code = OB_SUCCESS; \
switch (ret) { \
case -ER_ACCESS_DENIED_ERROR: \
ret_code = OB_PASSWORD_WRONG; \
case -ER_CONNECT_FAILED: \
ret_code = OB_CONNECT_ERROR; \
} \
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list", \
"tenant_id", tenant_id_, K(ret), \
"ob_error_name", ob_error_name(ret_code), \
"ob_error_str", ob_strerror(ret_code), \
"primary_user_tenant", user_and_tenant.ptr(), \
"primary_ip_list", service_attr.addr_); \
int ObRecoveryLSService::init()
{
int ret = OB_SUCCESS;
@ -83,6 +97,7 @@ void ObRecoveryLSService::destroy()
inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
proxy_ = NULL;
restore_proxy_.destroy();
primary_is_avaliable_= true;
}
@ -873,77 +888,62 @@ void ObRecoveryLSService::try_update_primary_ip_list()
uint64_t primary_tenant_id;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (OB_FAIL(restore_source_mgr.init(tenant_id_, proxy_))) {
LOG_WARN("fail to init restore_source_mgr", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.get_source(item))) {
LOG_WARN("get source failed", K_(tenant_id));
} else if (!check_need_update_ip_list(item)) {
LOG_INFO("there is no log restore source record or the log restore source type is not service" , K(item));
} else if (OB_FAIL(get_restore_source_value(item, standby_source_value))) {
LOG_WARN("fail to get service standby log_restore source value", K(item));
} else if (OB_FAIL(service_attr.parse_service_attr_from_str(standby_source_value))) {
LOG_WARN("fail to parse service attr", K(item), K(standby_source_value));
} else if (!service_attr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service attr is invalid", K(service_attr));
} else if (OB_FAIL(service_attr.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed", K(service_attr));
} else if (OB_FAIL(service_attr.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr.user_.user_name_), K(service_attr.user_.tenant_name_));
} else if (OB_FAIL(proxy.init(tenant_id_/*standby*/,
service_attr.addr_,
user_and_tenant.ptr(),
passwd,
service_attr.user_.mode_ == ObCompatibilityMode::MYSQL_MODE ? OB_SYS_DATABASE_NAME : OB_ORA_SYS_SCHEMA_NAME))) {
LOG_WARN("proxy fail to connect to primary", K_(tenant_id), K(service_attr.addr_), K(user_and_tenant));
bool cur_primary_state = false;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = false;
LOG_WARN("standby recovery ls service state changed");
if (-ER_ACCESS_DENIED_ERROR == ret) {
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_, K(ret),
"ob_error_name", ob_error_name(OB_PASSWORD_WRONG),
"ob_error_str", ob_strerror(OB_PASSWORD_WRONG),
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", service_attr.addr_);
} else if (-ER_CONNECT_FAILED == ret) {
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_, K(ret),
"ob_error_name", ob_error_name(OB_CONNECT_ERROR),
"ob_error_str", ob_strerror(OB_CONNECT_ERROR),
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", service_attr.addr_);
}
}
} else if (OB_FAIL(proxy.get_cluster_id(service_attr.user_.tenant_id_, primary_cluster_id))) {
LOG_WARN("proxy fail to get primary cluster id", K(service_attr.user_.tenant_id_));
} else if (OB_FAIL(proxy.get_tenant_id(service_attr.user_.tenant_name_, primary_tenant_id))) {
LOG_WARN("proxy fail to get primary tenant id", K(service_attr.user_.tenant_name_));
} else if (OB_FAIL(proxy.get_server_ip_list(service_attr.user_.tenant_id_, primary_addrs))) {
LOG_WARN("fail to get primary server ip list", K(service_attr.user_.tenant_id_));
} else if (primary_addrs.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant ip list is empty", K(primary_addrs));
} else if ((primary_cluster_id != service_attr.user_.cluster_id_) || (primary_tenant_id != service_attr.user_.tenant_id_)) {
LOG_WARN("primary cluster id or tenant id has changed",
K(primary_cluster_id), K(service_attr.user_.cluster_id_), K(primary_tenant_id), K(service_attr.user_.tenant_id_));
} else {
bool cur_primary_state = true;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = true;
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_,
"info", "primary connection recovery",
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", primary_addrs);
}
if (OB_FAIL(restore_source_mgr.init(tenant_id_, proxy_))) {
LOG_WARN("fail to init restore_source_mgr", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.get_source(item))) {
LOG_WARN("get source failed", K_(tenant_id));
} else if (!check_need_update_ip_list_(item)) {
LOG_INFO("there is no log restore source record or the log restore source type is not service" , K(item));
} else if (OB_FAIL(get_restore_source_value_(item, standby_source_value))) {
LOG_WARN("fail to get service standby log_restore source value", K(item));
} else if (OB_FAIL(service_attr.parse_service_attr_from_str(standby_source_value))) {
LOG_WARN("fail to parse service attr", K(item), K(standby_source_value));
} else if (!service_attr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service attr is invalid", K(service_attr));
} else if (OB_FAIL(service_attr.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed", K(service_attr));
} else if (OB_FAIL(service_attr.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr.user_.user_name_), K(service_attr.user_.tenant_name_));
} else if (!primary_is_avaliable_ && restore_proxy_.is_inited()) {
LOG_WARN("primary is not avaliable, retry init restore proxy");
restore_proxy_.destroy();
} else if (!restore_proxy_.is_inited() && OB_FAIL(restore_proxy_.init(tenant_id_/*standby*/,
service_attr.addr_,
user_and_tenant.ptr(),
passwd,
service_attr.user_.mode_ == ObCompatibilityMode::MYSQL_MODE ? OB_SYS_DATABASE_NAME : OB_ORA_SYS_SCHEMA_NAME))) {
LOG_WARN("restore_proxy_ fail to connect to primary", K_(tenant_id), K(service_attr.addr_), K(user_and_tenant));
} else if (OB_FAIL(restore_proxy_.get_cluster_id(service_attr.user_.tenant_id_, primary_cluster_id))) {
LOG_WARN("restore proxy fail to get primary cluster id", K(service_attr.user_.tenant_id_));
} else if (OB_FAIL(restore_proxy_.get_tenant_id(service_attr.user_.tenant_name_, primary_tenant_id))) {
LOG_WARN("restore proxy fail to get primary tenant id", K(service_attr.user_.tenant_name_));
} else if (OB_FAIL(restore_proxy_.get_server_ip_list(service_attr.user_.tenant_id_, primary_addrs))) {
LOG_WARN("restore proxy fail to get primary server ip list", K(service_attr.user_.tenant_id_));
} else if (primary_addrs.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant ip list is empty", K(primary_addrs));
} else if ((primary_cluster_id != service_attr.user_.cluster_id_) || (primary_tenant_id != service_attr.user_.tenant_id_)) {
LOG_WARN("primary cluster id or tenant id has changed",
K(primary_cluster_id), K(service_attr.user_.cluster_id_), K(primary_tenant_id), K(service_attr.user_.tenant_id_));
} else {
bool cur_primary_state = true;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = true;
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_,
"info", "primary connection recovery",
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", primary_addrs);
}
bool addr_is_same = false;
addr_is_same = service_attr.compare_addr_(primary_addrs);
if (!addr_is_same) {
common::ObArray<common::ObAddr> tmp_addr = service_attr.addr_;
bool addr_is_same = false;
addr_is_same = service_attr.compare_addr_(primary_addrs);
if (!addr_is_same) {
common::ObArray<common::ObAddr> tmp_addr;
if (OB_FAIL(tmp_addr.assign(service_attr.addr_))) {
LOG_WARN("fail to assign service attr addr", K(service_attr.addr_));
} else {
service_attr.addr_.reset();
ARRAY_FOREACH_N(primary_addrs, idx, cnt) {
if (OB_FAIL(service_attr.addr_.push_back(primary_addrs.at(idx)))) {
@ -953,26 +953,34 @@ void ObRecoveryLSService::try_update_primary_ip_list()
LOG_INFO("primary ip list has changed", K(addr_is_same), K(primary_addrs), K(service_attr.addr_));
if (OB_FAIL(ret)) {
LOG_WARN("fail to update primary ip list");
} else if (OB_FAIL(do_update_restore_source(service_attr, restore_source_mgr))) {
} else if (OB_FAIL(do_update_restore_source_(service_attr, restore_source_mgr))) {
LOG_WARN("fail to update restore source", K(service_attr));
}
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_,
"info", "do update primary ip list",
"primary_user_tenant", user_and_tenant.ptr(),
"new_primary_ip_list", tmp_addr,
"old_primary_ip_list", primary_addrs);
"new_primary_ip_list", primary_addrs,
"old_primary_ip_list", tmp_addr);
}
}
}
if (OB_FAIL(ret)) {
bool cur_primary_state = false;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = false;
LOG_WARN("standby recovery ls service state changed");
RESTORE_EVENT_ADD;
}
}
}
bool ObRecoveryLSService::check_need_update_ip_list(ObLogRestoreSourceItem &item)
bool ObRecoveryLSService::check_need_update_ip_list_(ObLogRestoreSourceItem &item)
{
return item.is_valid() && ObLogRestoreSourceType::SERVICE == item.type_;
}
int ObRecoveryLSService::get_restore_source_value(ObLogRestoreSourceItem &item, ObSqlString &standby_source_value)
int ObRecoveryLSService::get_restore_source_value_(ObLogRestoreSourceItem &item, ObSqlString &standby_source_value)
{
int ret = OB_SUCCESS;
ObSqlString value;
@ -985,7 +993,7 @@ int ObRecoveryLSService::get_restore_source_value(ObLogRestoreSourceItem &item,
return ret;
}
int ObRecoveryLSService::do_update_restore_source(
int ObRecoveryLSService::do_update_restore_source_(
ObRestoreSourceServiceAttr &old_attr,
ObLogRestoreSourceMgr &restore_source_mgr)
{
@ -1005,7 +1013,7 @@ int ObRecoveryLSService::do_update_restore_source(
} else if (ObLogRestoreSourceType::SERVICE != tmp_item.type_) {
ret = OB_EAGAIN;
LOG_WARN("log restore source type is not service", K_(tenant_id), K(tmp_item.type_));
} else if (OB_FAIL(get_restore_source_value(tmp_item, tmp_standby_source_value))) {
} else if (OB_FAIL(get_restore_source_value_(tmp_item, tmp_standby_source_value))) {
LOG_WARN("fail to get service standby log restore source value", K(tmp_item));
} else if (OB_FAIL(tmp_service_attr.parse_service_attr_from_str(tmp_standby_source_value))) {
LOG_WARN("fail to parse service attr", K(tmp_item), K(tmp_standby_source_value));
@ -1018,7 +1026,7 @@ int ObRecoveryLSService::do_update_restore_source(
} else if (!(0 == STRCMP(tmp_service_attr.encrypt_passwd_, old_attr.encrypt_passwd_))) {
ret = OB_EAGAIN;
LOG_WARN("log restore source password may be modified", K_(tenant_id));
} else if (OB_FAIL(update_source_inner_table(updated_value_str, sizeof(updated_value_str), trans, tmp_item))) {
} else if (OB_FAIL(update_source_inner_table_(updated_value_str, sizeof(updated_value_str), trans, tmp_item))) {
LOG_WARN("fail to add service source", K(updated_value_str), K(tmp_item.until_scn_));
}
@ -1032,7 +1040,10 @@ int ObRecoveryLSService::do_update_restore_source(
return ret;
}
int ObRecoveryLSService::update_source_inner_table(char *buf, const int64_t buf_size, ObMySQLTransaction &trans, const ObLogRestoreSourceItem &item)
int ObRecoveryLSService::update_source_inner_table_(char *buf,
const int64_t buf_size,
ObMySQLTransaction &trans,
const ObLogRestoreSourceItem &item)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;