[FEAT MERGE] log4200

Co-authored-by: zhjc1124 <zhjc1124@gmail.com>
Co-authored-by: BinChenn <binchenn.bc@gmail.com>
Co-authored-by: oceanoverflow <oceanoverflow@gmail.com>
This commit is contained in:
obdev
2023-05-06 08:15:43 +00:00
committed by ob-robot
parent f0fdf277f1
commit d6c6e05727
450 changed files with 33015 additions and 2243 deletions

View File

@ -27,6 +27,7 @@
#include "rootserver/ob_primary_ls_service.h" //ObTenantLSInfo
#include "rootserver/ob_tenant_recovery_reportor.h" //ObTenantRecoveryReportor
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
#include "rootserver/ob_create_standby_from_net_actor.h" // ObCreateStandbyFromNetActor
#include "share/ls/ob_ls_life_manager.h" //ObLSLifeManger
#include "share/ls/ob_ls_operator.h" //ObLSAttr
#include "share/ls/ob_ls_recovery_stat_operator.h" //ObLSRecoveryLSStatOperator
@ -42,6 +43,8 @@
#include "storage/tx/ob_tx_log.h" //ObTxLogHeader
#include "storage/tx_storage/ob_ls_service.h" //ObLSService
#include "storage/tx_storage/ob_ls_handle.h" //ObLSHandle
#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil
#include "src/rootserver/ob_rs_event_history_table_operator.h"
namespace oceanbase
{
@ -80,6 +83,7 @@ void ObRecoveryLSService::destroy()
inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
proxy_ = NULL;
primary_is_avaliable_= true;
}
void ObRecoveryLSService::do_work()
@ -115,16 +119,17 @@ void ObRecoveryLSService::do_work()
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("failed to process recovery ls manager", KR(ret), KR(tmp_ret));
}
if (tenant_info.is_standby()) {
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) {
(void)try_update_primary_ip_list();
}
}
} else {
if (!start_scn.is_valid()) {
ObLSRecoveryStat ls_recovery_stat;
if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_,
SYS_LS, false, ls_recovery_stat, *proxy_))) {
LOG_WARN("failed to load sys recovery stat", KR(ret), K(tenant_id_));
} else if (SCN::base_scn() == ls_recovery_stat.get_sync_scn()) {
//等待物理恢复设置完系统日志流初始同步点之后开始迭代
ret = OB_EAGAIN;
LOG_WARN("restore init sync scn has not setted, need wait", KR(ret), K(ls_recovery_stat));
} else if (OB_FAIL(report_sys_ls_recovery_stat_(ls_recovery_stat.get_sync_scn()))) {
//may recovery end, but readable scn need report
LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat));
@ -177,16 +182,25 @@ int ObRecoveryLSService::seek_log_iterator_(const SCN &sync_scn, PalfBufferItera
} else {
ObLogHandler *log_handler = NULL;
ObLS *ls = NULL;
palf::LSN start_lsn;
palf::LSN start_lsn(0);
if (OB_ISNULL(ls = ls_handle.get_ls())
|| OB_ISNULL(log_handler = ls->get_log_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls or log handle is null", KR(ret), KP(ls),
KP(log_handler));
} else if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) {
LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn));
} else if (OB_FAIL(log_handler->seek(start_lsn, iterator))) {
LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn));
} else {
if (SCN::base_scn() == sync_scn) {
// start_lsn = 0;
} else {
if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) {
LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(log_handler->seek(start_lsn, iterator))) {
LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn), K(start_lsn));
}
}
}
}
@ -280,13 +294,13 @@ int ObRecoveryLSService::process_ls_log_(
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (OB_FAIL(report_sys_ls_recovery_stat_(sync_scn))) {
LOG_WARN("failed to report ls recovery stat", KR(ret), K(sync_scn));
LOG_WARN("failed to report ls recovery stat", KR(ret), K(sync_scn), K(iterator));
}
} else if (OB_SUCC(ret)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("iterator must be end", KR(ret));
LOG_WARN("iterator must be end", KR(ret), K(iterator));
} else {
LOG_WARN("failed to get next log", KR(ret));
LOG_WARN("failed to get next log", KR(ret), K(iterator));
}
return ret;
@ -421,7 +435,13 @@ int ObRecoveryLSService::get_min_data_version_(uint64_t &compatible)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config result is null", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
} else if (OB_FAIL(result.get_result()->next())) {
LOG_WARN("get result next failed", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
if (OB_ITER_END == ret) {
LOG_WARN("compatible not set, it is net standby tenant", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
ret = OB_SUCCESS;
compatible = 0;
} else {
LOG_WARN("get result next failed", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
}
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is null", KR(ret), K_(tenant_id), K(exec_tenant_id), K(sql));
@ -602,6 +622,7 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(
const share::ObLSAttr &ls_attr, const SCN &sync_scn)
{
int ret = OB_SUCCESS;
bool has_user_ls = true;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
@ -617,7 +638,9 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(
} else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operatin_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls recovery must stop while pre tenant dropping", KR(ret), K(ls_attr));
} else {
} else if (OB_FAIL(ObCreateStandbyFromNetActor::check_has_user_ls(tenant_id_, proxy_, has_user_ls))) {
LOG_WARN("check_has_user_ls failed", KR(ret), K(tenant_id_));
} else if (has_user_ls) {
SCN user_scn;
ObLSRecoveryStatOperator ls_recovery;
if (OB_FAIL(ls_recovery.get_user_ls_sync_scn(tenant_id_, *proxy_, user_scn))) {
@ -830,6 +853,202 @@ int ObRecoveryLSService::report_sys_ls_recovery_stat_(const SCN &sync_scn)
return ret;
}
void ObRecoveryLSService::try_update_primary_ip_list()
{
int ret = OB_SUCCESS;
ObLogRestoreSourceMgr restore_source_mgr;
ObLogRestoreSourceItem item;
common::ObArray<common::ObAddr> primary_addrs;
ObSqlString standby_source_value;
ObRestoreSourceServiceAttr service_attr;
ObMySQLTransaction trans;
ObSqlString user_and_tenant;
int64_t primary_cluster_id;
uint64_t primary_tenant_id;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (OB_FAIL(restore_source_mgr.init(tenant_id_, proxy_))) {
LOG_WARN("fail to init restore_source_mgr", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.get_source(item))) {
LOG_WARN("get source failed", K_(tenant_id));
} else if (!check_need_update_ip_list(item)) {
LOG_INFO("there is no log restore source record or the log restore source type is not service" , K(item));
} else if (OB_FAIL(get_restore_source_value(item, standby_source_value))) {
LOG_WARN("fail to get service standby log_restore source value", K(item));
} else if (OB_FAIL(service_attr.parse_service_attr_from_str(standby_source_value))) {
LOG_WARN("fail to parse service attr", K(item), K(standby_source_value));
} else if (!service_attr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service attr is invalid", K(service_attr));
} else if (OB_FAIL(service_attr.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed", K(service_attr));
} else if (OB_FAIL(service_attr.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr.user_.user_name_), K(service_attr.user_.tenant_name_));
} else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr.addr_, user_and_tenant.ptr(), passwd))) {
LOG_WARN("proxy fail to connect to primary", K_(tenant_id), K(service_attr.addr_), K(user_and_tenant));
bool cur_primary_state = false;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = false;
LOG_WARN("standby recovery ls service state changed");
if (-ER_ACCESS_DENIED_ERROR == ret) {
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_, K(ret),
"ob_error_name", ob_error_name(OB_PASSWORD_WRONG),
"ob_error_str", ob_strerror(OB_PASSWORD_WRONG),
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", service_attr.addr_);
} else if (-ER_CONNECT_FAILED == ret) {
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_, K(ret),
"ob_error_name", ob_error_name(OB_CONNECT_ERROR),
"ob_error_str", ob_strerror(OB_CONNECT_ERROR),
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", service_attr.addr_);
}
}
} else if (OB_FAIL(proxy.get_cluster_id(service_attr.user_.tenant_id_, primary_cluster_id))) {
LOG_WARN("proxy fail to get primary cluster id", K(service_attr.user_.tenant_id_));
} else if (OB_FAIL(proxy.get_tenant_id(service_attr.user_.tenant_name_, primary_tenant_id))) {
LOG_WARN("proxy fail to get primary tenant id", K(service_attr.user_.tenant_name_));
} else if (OB_FAIL(proxy.get_server_ip_list(service_attr.user_.tenant_id_, primary_addrs))) {
LOG_WARN("fail to get primary server ip list", K(service_attr.user_.tenant_id_));
} else if (primary_addrs.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant ip list is empty", K(primary_addrs));
} else if ((primary_cluster_id != service_attr.user_.cluster_id_) || (primary_tenant_id != service_attr.user_.tenant_id_)) {
LOG_WARN("primary cluster id or tenant id has changed",
K(primary_cluster_id), K(service_attr.user_.cluster_id_), K(primary_tenant_id), K(service_attr.user_.tenant_id_));
} else {
bool cur_primary_state = true;
if (cur_primary_state != primary_is_avaliable_) {
primary_is_avaliable_ = true;
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_,
"info", "primary connection recovery",
"primary_user_tenant", user_and_tenant.ptr(),
"primary_ip_list", primary_addrs);
}
bool addr_is_same = false;
addr_is_same = service_attr.compare_addr_(primary_addrs);
if (!addr_is_same) {
common::ObArray<common::ObAddr> tmp_addr = service_attr.addr_;
service_attr.addr_.reset();
ARRAY_FOREACH_N(primary_addrs, idx, cnt) {
if (OB_FAIL(service_attr.addr_.push_back(primary_addrs.at(idx)))) {
LOG_WARN("fail to push back primary addr", K(primary_addrs.at(idx)));
}
}
LOG_INFO("primary ip list has changed", K(addr_is_same), K(primary_addrs), K(service_attr.addr_));
if (OB_FAIL(ret)) {
LOG_WARN("fail to update primary ip list");
} else if (OB_FAIL(do_update_restore_source(service_attr, restore_source_mgr))) {
LOG_WARN("fail to update restore source", K(service_attr));
}
ROOTSERVICE_EVENT_ADD("root_service", "update_primary_ip_list",
"tenant_id", tenant_id_,
"info", "do update primary ip list",
"primary_user_tenant", user_and_tenant.ptr(),
"new_primary_ip_list", tmp_addr,
"old_primary_ip_list", primary_addrs);
}
}
}
}
bool ObRecoveryLSService::check_need_update_ip_list(ObLogRestoreSourceItem &item)
{
return item.is_valid() && ObLogRestoreSourceType::SERVICE == item.type_;
}
int ObRecoveryLSService::get_restore_source_value(ObLogRestoreSourceItem &item, ObSqlString &standby_source_value)
{
int ret = OB_SUCCESS;
ObSqlString value;
if (!item.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log restore source item is invalid");
} else if (OB_FAIL(standby_source_value.assign(item.value_))) {
LOG_WARN("fail to assign standby source value", K(item.value_));
}
return ret;
}
int ObRecoveryLSService::do_update_restore_source(
ObRestoreSourceServiceAttr &old_attr,
ObLogRestoreSourceMgr &restore_source_mgr)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObLogRestoreSourceItem tmp_item;
ObRestoreSourceServiceAttr tmp_service_attr;
ObSqlString tmp_standby_source_value;
char updated_value_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
if (OB_FAIL(old_attr.gen_service_attr_str(updated_value_str, sizeof(updated_value_str)))) {
LOG_WARN("gen service_attr_str failed", K(updated_value_str), K(old_attr));
} else if (OB_FAIL(trans.start(proxy_, gen_meta_tenant_id(tenant_id_)))) {
LOG_WARN("fail to start trans", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.get_source_for_update(tmp_item, trans))) {
LOG_WARN("fail to get log restore source when double check" , K(tmp_item));
} else if (ObLogRestoreSourceType::SERVICE != tmp_item.type_) {
ret = OB_EAGAIN;
LOG_WARN("log restore source type is not service", K_(tenant_id), K(tmp_item.type_));
} else if (OB_FAIL(get_restore_source_value(tmp_item, tmp_standby_source_value))) {
LOG_WARN("fail to get service standby log restore source value", K(tmp_item));
} else if (OB_FAIL(tmp_service_attr.parse_service_attr_from_str(tmp_standby_source_value))) {
LOG_WARN("fail to parse service attr", K(tmp_item), K(tmp_standby_source_value));
} else if (!tmp_service_attr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service attr is invalid", K(tmp_service_attr));
} else if (!(tmp_service_attr.user_ == old_attr.user_)) {
ret = OB_EAGAIN;
LOG_WARN("log restore source record may be modified", K(tmp_service_attr.user_), K(old_attr.user_));
} else if (!(0 == STRCMP(tmp_service_attr.encrypt_passwd_, old_attr.encrypt_passwd_))) {
ret = OB_EAGAIN;
LOG_WARN("log restore source password may be modified", K_(tenant_id));
} else if (OB_FAIL(update_source_inner_table(updated_value_str, sizeof(updated_value_str), trans, tmp_item))) {
LOG_WARN("fail to add service source", K(updated_value_str), K(tmp_item.until_scn_));
}
int temp_ret = OB_SUCCESS;
if (trans.is_started()) {
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
LOG_WARN("trans end failed", "is_commit", OB_SUCCESS == ret, K(temp_ret));
ret = (OB_SUCC(ret)) ? temp_ret : ret;
}
}
return ret;
}
int ObRecoveryLSService::update_source_inner_table(char *buf, const int64_t buf_size, ObMySQLTransaction &trans, const ObLogRestoreSourceItem &item)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument when get source for update",K(buf_size));
} else {
uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
ObDMLSqlSplicer dml;
ObSqlString sql;
if (OB_FAIL(dml.add_pk_column(OB_STR_TENANT_ID, tenant_id_))) {
LOG_WARN("failed to add column", K_(tenant_id), K(item));
} else if (OB_FAIL(dml.add_pk_column(OB_STR_LOG_RESTORE_SOURCE_ID, item.id_))) {
LOG_WARN("failed to add column", K_(tenant_id), K(item));
} else if (OB_FAIL(dml.add_column(OB_STR_LOG_RESTORE_SOURCE_VALUE, buf))) {
LOG_WARN("failed to add column", K_(tenant_id), K(item));
} else if (OB_FAIL(dml.splice_update_sql(OB_ALL_LOG_RESTORE_SOURCE_TNAME, sql))) {
LOG_WARN("fill update source value sql failed", K(item));
} else if (OB_FAIL(trans.write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to exec sql", K(sql), K_(tenant_id));
}
}
return ret;
}
}//end of namespace rootserver
}//end of namespace oceanbase