[4.2][Standby]V$OB_LS_LOG_RESTORE_STATUS support primary upgrade and primary tenant dropped scenarios

This commit is contained in:
obdev
2023-08-02 11:12:57 +00:00
committed by ob-robot
parent 0f8302babc
commit f68351d625
5 changed files with 188 additions and 21 deletions

View File

@ -58,7 +58,7 @@ const char *restore_comment_str[static_cast<int>(RestoreSyncStatus::MAX_RESTORE_
"Log source is unreachable, the log source access point may be unavailable",
"Fetch log time out",
"Restore suspend, the standby has synchronized to recovery until scn",
"Standby binary version is lower than primary data version, standby need upgrade",
"Standby binary version is lower than primary data version, standby need to upgrade",
"Primary tenant has been dropped",
"Unexpected exceptions",
};
@ -927,8 +927,8 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s
CLOG_LOG(WARN, "fail to get restore error");
} else if (error_exist) {
CLOG_LOG(TRACE, "start to mark restore sync error", K(trace_id), K(ret_code), K(context_.error_context_.error_type_));
if (OB_FAIL(get_err_code_and_message_(ret_code, context_.error_context_.error_type_, sync_status))) {
CLOG_LOG(WARN, "fail to get err code and message", K(ret_code), K(context_.error_context_.error_type_), K(sync_status));
if (OB_FAIL(get_restore_sync_status(ret_code, context_.error_context_.error_type_, sync_status))) {
CLOG_LOG(WARN, "fail to get sync status", K(ret_code), K(context_.error_context_.error_type_), K(sync_status));
} else {
restore_status_info.sync_status_ = sync_status;
}
@ -944,8 +944,8 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s
restore_status_info.err_code_ = ret_code;
restore_status_info.sync_lsn_ = lsn.val_;
restore_status_info.sync_scn_ = scn;
if (OB_FAIL(restore_status_info.comment_.assign_fmt("%s", restore_comment_str[int(restore_status_info.sync_status_)]))) {
CLOG_LOG(WARN, "fail to assign comment", K(sync_status));
if (OB_FAIL(restore_status_info.get_restore_comment())) {
CLOG_LOG(WARN, "fail to get comment", K(sync_status));
} else {
CLOG_LOG(TRACE, "success to get error code and message", K(restore_status_info));
}
@ -953,7 +953,7 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s
return ret;
}
int ObLogRestoreHandler::get_err_code_and_message_(int ret_code,
int ObLogRestoreHandler::get_restore_sync_status(int ret_code,
ObLogRestoreErrorContext::ErrorType error_type,
RestoreSyncStatus &sync_status)
{
@ -1014,6 +1014,27 @@ RestoreStatusInfo::RestoreStatusInfo()
comment_.reset();
}
int RestoreStatusInfo::set(const share::ObLSID &ls_id,
const palf::LSN &lsn, const share::SCN &scn, int err_code,
const RestoreSyncStatus sync_status)
{
int ret = OB_SUCCESS;
if (!ls_id.is_valid() || !lsn.is_valid() || !scn.is_valid() || OB_SUCCESS == err_code) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", KR(ret), K(ls_id), K(lsn), K(scn), K(err_code));
} else {
ls_id_ = ls_id.id();
sync_scn_= scn;
sync_lsn_ = lsn.val_;
sync_status_ = sync_status;
err_code_ = err_code;
if (OB_FAIL(get_restore_comment())) {
CLOG_LOG(WARN, "failed to assign comment", KR(ret), K(sync_status));
}
}
return ret;
}
void RestoreStatusInfo::reset()
{
ls_id_ = share::ObLSID::INVALID_LS_ID;
@ -1040,6 +1061,31 @@ int RestoreStatusInfo::restore_sync_status_to_string(char *str_buf, const int64_
return ret;
}
int RestoreStatusInfo::get_restore_comment()
{
int ret = OB_SUCCESS;
if (OB_FAIL(comment_.assign_fmt("%s", restore_comment_str[int(sync_status_)]))) {
CLOG_LOG(WARN, "fail to assign comment", K_(sync_status));
} else {
CLOG_LOG(TRACE, "success to get restore status comment", K_(sync_status));
}
return ret;
}
int RestoreStatusInfo::assign(const RestoreStatusInfo &other)
{
int ret = OB_SUCCESS;
ls_id_ = other.ls_id_;
sync_lsn_ = other.sync_lsn_;
sync_scn_ = other.sync_scn_;
sync_status_ = other.sync_status_;
err_code_ = other.err_code_;
if (OB_FAIL(comment_.assign(other.comment_))) {
CLOG_LOG(WARN, "fail to assign comment");
}
return ret;
}
bool RestoreStatusInfo::is_valid() const
{
return ls_id_ != share::ObLSID::INVALID_LS_ID

View File

@ -99,6 +99,11 @@ public:
bool is_valid() const;
void reset();
int restore_sync_status_to_string(char *str_buf, const int64_t str_len);
int set(const share::ObLSID &ls_id,
const palf::LSN &lsn, const share::SCN &scn, int err_code,
const RestoreSyncStatus sync_status);
int get_restore_comment();
int assign(const RestoreStatusInfo &other);
TO_STRING_KV(K_(ls_id), K_(sync_lsn), K_(sync_scn), K_(sync_status), K_(err_code), K_(comment));
public:
@ -230,6 +235,7 @@ public:
int diagnose(RestoreDiagnoseInfo &diagnose_info) const;
int refresh_error_context();
int get_ls_restore_status_info(RestoreStatusInfo &restore_status_info);
int get_restore_sync_status(int ret_code, const ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &sync_status);
TO_STRING_KV(K_(is_inited), K_(is_in_stop_state), K_(id), K_(proposal_id), K_(role), KP_(parent), K_(context), K_(restore_context));
private:
@ -243,7 +249,6 @@ private:
int check_restore_to_newest_from_archive_(ObLogArchivePieceContext &piece_context,
const palf::LSN &end_lsn, const share::SCN &end_scn, share::SCN &archive_scn);
bool restore_to_end_unlock_() const;
int get_err_code_and_message_(int ret_code, ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &err_type);
private:
ObRemoteLogParent *parent_;

View File

@ -22,6 +22,7 @@
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "storage/tx_storage/ob_ls_map.h" // ObLSIterator
#include "storage/tx_storage/ob_ls_service.h" // ObLSService
#include "rootserver/ob_recovery_ls_service.h" //ObLSRecoveryService
using namespace oceanbase::share;
@ -67,12 +68,10 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row)
ObLSService *ls_svr = MTL(ObLSService*);
if (is_user_tenant(MTL_ID())) {
if (OB_ISNULL(ls_svr)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "mtl ObLSService should not be null", K(ret));
} else if (OB_FAIL(ls_svr->get_ls_iter(guard, ObLSGetMod::LOG_MOD))) {
SERVER_LOG(WARN, "get ls iter failed", K(ret));
} else if (OB_ISNULL(iter = guard.get_ptr())) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "ls iter is NULL", K(ret), K(iter));
} else {
while (OB_SUCC(ret)) {
@ -89,6 +88,7 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row)
} else {
SERVER_LOG(TRACE, "start to iterate this log_stream", K(MTL_ID()), K(ls->get_ls_id()));
logservice::RestoreStatusInfo restore_status_info;
logservice::RestoreStatusInfo sys_restore_status_info;
restore_handler = ls->get_log_restore_handler();
if (OB_ISNULL(restore_handler)) {
SERVER_LOG(WARN, "restore handler is NULL", K(ret), K(ls));
@ -96,11 +96,36 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row)
SERVER_LOG(WARN, "fail to get ls restore status info");
} else if (!restore_status_info.is_valid()) {
SERVER_LOG(WARN, "restore status info is invalid", K(restore_status_info));
} else if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) {
SERVER_LOG(WARN, "fail to insert ls restore status info");
} else {
SERVER_LOG(TRACE, "iterate this log_stream success");
scanner_.add_row(cur_row_);
} else if (!ls->is_sys_ls()) { // not sys ls
if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) {
SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info));
} else {
SERVER_LOG(TRACE, "iterate user log_stream success", K(ls));
scanner_.add_row(cur_row_);
}
} else { // sys ls
rootserver::ObRecoveryLSService *ls_recovery_svr = MTL(rootserver::ObRecoveryLSService*);
if (OB_ISNULL(ls_recovery_svr)) {
SERVER_LOG(WARN, "ls recovery service is NULL", K(ret), K(ls));
} else if (OB_FAIL(ls_recovery_svr->get_sys_restore_status(sys_restore_status_info))) {
SERVER_LOG(WARN, "get sys restore status failed", K(ls));
// use restore_status_info if get sys ls restore status failed
if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) {
SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info));
} else {
SERVER_LOG(TRACE, "insert ls restore status info success after get sys restore status failed", K(ls));
scanner_.add_row(cur_row_);
}
} else if (sys_restore_status_info.is_valid()
&& OB_FAIL(insert_ls_restore_status_info_(sys_restore_status_info))) {
SERVER_LOG(WARN, "fail to insert ls restore status info", K(sys_restore_status_info));
} else if (!sys_restore_status_info.is_valid()
&& OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) {
SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info));
} else {
SERVER_LOG(TRACE, "iterate sys log_stream success", K(ls));
scanner_.add_row(cur_row_);
}
}
}
} // while

View File

@ -93,6 +93,7 @@ int ObRecoveryLSService::init()
tenant_id_ = MTL_ID();
proxy_ = GCTX.sql_proxy_;
last_report_ts_ = OB_INVALID_TIMESTAMP;
restore_status_.reset();
inited_ = true;
}
@ -109,6 +110,33 @@ void ObRecoveryLSService::destroy()
restore_proxy_.destroy();
last_report_ts_ = OB_INVALID_TIMESTAMP;
primary_is_avaliable_= true;
restore_status_.reset();
}
int ObRecoveryLSService::get_sys_restore_status(logservice::RestoreStatusInfo &restore_status)
{
int ret = OB_SUCCESS;
palf::PalfHandleGuard palf_handle_guard;
palf::LSN report_lsn;
SCN report_scn;
if (has_set_stop()) {
ret = OB_IN_STOP_STATE;
LOG_WARN("thread is in stop state");
} else if (!restore_status_.is_valid()) {
LOG_INFO("restore status is invalid", K(restore_status_));
} else if (OB_FAIL(restore_status.assign(restore_status_))) {
LOG_WARN("restore status assign failed", K(restore_status_));
} else if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) {
LOG_WARN("init palf handle guard failed");
} else if (OB_FAIL(palf_handle_guard.get_end_lsn(report_lsn))) {
LOG_WARN("fail to get end lsn");
} else if (OB_FAIL(palf_handle_guard.get_end_scn(report_scn))) {
LOG_WARN("fail to get end scn");
} else {
restore_status.sync_lsn_ = report_lsn.val_;
restore_status.sync_scn_ = report_scn;
}
return ret;
}
void ObRecoveryLSService::do_work()
@ -149,6 +177,7 @@ void ObRecoveryLSService::do_work()
LOG_WARN("tenant info is primary", KR(ret), K(tenant_info));
} else if (OB_FAIL(check_can_do_recovery_(tenant_id_))) {
LOG_WARN("can not do recovery now", KR(ret), K(tenant_id_));
restore_status_.reset();
} else if (0 == thread_idx) {
idle_time_us = 10 * 1000 * 1000L;
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD0);
@ -203,10 +232,11 @@ void ObRecoveryLSService::do_work()
}
}//end thread1
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret),
K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us));
K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us), K_(restore_status));
idle(idle_time_us);
ret = OB_SUCCESS;
}//end while
restore_status_.reset();
}
}
@ -297,6 +327,7 @@ int ObRecoveryLSService::process_ls_log_(
LOG_WARN("SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log",
KR(ret), K(sync_scn), K(tenant_info), K(log_entry), K(target_lsn), K(start_scn));
start_scn.reset();
restore_status_.reset(); // need to reset restore status if iterate to recovery end
}
} else if (OB_FAIL(header.deserialize(log_buf, HEADER_SIZE, log_pos))) {
LOG_WARN("failed to deserialize", KR(ret), K(HEADER_SIZE));
@ -406,7 +437,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) {
const ObTxBufferNode &node = source_data.at(i);
if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) {
if (OB_FAIL(process_upgrade_log_(node))) {
if (OB_FAIL(process_upgrade_log_(sync_scn, node))) {
LOG_WARN("failed to process_upgrade_log_", KR(ret), K(node));
}
} else if (ObTxDataSourceType::LS_TABLE != node.get_data_source_type()
@ -442,14 +473,15 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
return ret;
}
int ObRecoveryLSService::process_upgrade_log_(const ObTxBufferNode &node)
int ObRecoveryLSService::process_upgrade_log_(
const share::SCN &sync_scn, const ObTxBufferNode &node)
{
int ret = OB_SUCCESS;
uint64_t standby_data_version = 0;
if (!node.is_valid()) {
if (!node.is_valid() || !sync_scn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("node is invalid", KR(ret), K(node));
LOG_WARN("node is invalid", KR(ret), K(node), K(sync_scn));
} else {
ObStandbyUpgrade primary_data_version;
int64_t pos = 0;
@ -473,6 +505,10 @@ int ObRecoveryLSService::process_upgrade_log_(const ObTxBufferNode &node)
LOG_ERROR("standby version is not new enough to recover primary clog", KR(ret),
K(primary_data_version), K(standby_data_version));
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(init_restore_status(sync_scn, OB_ERR_RESTORE_STANDBY_VERSION_LAG))) {
LOG_WARN("failed to init restore status", KR(tmp_ret), K(sync_scn));
}
}
}
}
@ -713,6 +749,10 @@ int ObRecoveryLSService::process_ls_table_in_trans_(const transaction::ObTxBuffe
if (share::is_ls_tenant_drop_pre_op(ls_attr.get_ls_operation_type())) {
ret = OB_ITER_STOP;
LOG_WARN("can not process ls operator after tenant dropping", K(ls_attr));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(init_restore_status(sync_scn, OB_ERR_RESTORE_PRIMARY_TENANT_DROPPED))) {
LOG_WARN("failed to init restore status", KR(tmp_ret), K(sync_scn), KR(tmp_ret));
}
} else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operation_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls recovery must stop while pre tenant dropping", KR(ret), K(ls_attr));
@ -750,6 +790,7 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(const SCN &sync_scn)
ret = OB_NEED_WAIT;
LOG_WARN("can not process ls operator, need wait other ls sync", KR(ret),
K(user_scn), K(sync_scn));
restore_status_.reset();
}
}
@ -968,6 +1009,10 @@ int ObRecoveryLSService::report_sys_ls_recovery_stat_in_trans_(
K(ls_recovery_stat), K(tenant_info));
} else {
last_report_ts_ = ObTimeUtility::current_time();
if (!only_update_readable_scn) {
//如果汇报了sync_scn,需要把restore_status重置掉
restore_status_.reset();
}
}
CLICK();
FLOG_INFO("report sys ls recovery stat", KR(ret), K(sync_scn), K(only_update_readable_scn),
@ -1323,6 +1368,47 @@ int ObRecoveryLSService::update_source_inner_table_(char *buf,
return ret;
}
int ObRecoveryLSService::init_restore_status(const share::SCN &sync_scn, int err_code)
{
int ret = OB_SUCCESS;
if (!sync_scn.is_valid() || OB_SUCCESS == err_code) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KR(err_code), K(sync_scn));
} else {
palf::LSN start_lsn(0);
ObSqlString sync_comment;
RestoreSyncStatus sync_status;
ObLSService *ls_svr = MTL(ObLSService *);
ObLSHandle ls_handle;
if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) {
LOG_WARN("failed to get ls", KR(ret));
} else {
ObLogHandler *log_handler = NULL;
ObLogRestoreHandler *restore_handler = NULL;
ObLS *ls = NULL;
palf::LSN start_lsn;
if (OB_ISNULL(ls = ls_handle.get_ls()) || OB_ISNULL(log_handler = ls->get_log_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls or log handle is null", KR(ret), KP(ls), KP(log_handler));
} else if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) {
LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn));
} else if (OB_ISNULL(restore_handler = ls->get_log_restore_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get restore handler", KR(ret), K(sync_scn));
} else if (OB_FAIL(restore_handler->get_restore_sync_status(err_code,
ObLogRestoreErrorContext::ErrorType::FETCH_LOG,
sync_status))) {
LOG_WARN("fail to get error code and message", KR(ret), K(sync_scn), K_(restore_status));
} else if (OB_FAIL(restore_status_.set(SYS_LS, start_lsn, sync_scn, err_code, sync_status))) {
LOG_WARN("failed to init restore status", KR(ret), K(start_lsn), K(err_code), K(sync_scn));
} else {
LOG_TRACE("init sys restore status success", K(restore_status_));
}
}
}
return ret;
}
}//end of namespace rootserver
}//end of namespace oceanbase

View File

@ -16,6 +16,7 @@
#include "logservice/ob_log_base_type.h"//ObIRoleChangeSubHandler ObICheckpointSubHandler ObIReplaySubHandler
#include "logservice/palf/lsn.h"//palf::LSN
#include "logservice/palf/palf_iterator.h" //PalfBufferIterator
#include "logservice/restoreservice/ob_log_restore_handler.h"//RestoreStatusInfo
#include "ob_primary_ls_service.h" //ObTenantThreadHelper
#include "lib/lock/ob_spin_lock.h" //ObSpinLock
#include "storage/tx/ob_multi_data_source.h" //ObTxBufferNode
@ -78,10 +79,11 @@ class ObRecoveryLSService : public ObTenantThreadHelper
{
public:
ObRecoveryLSService() : inited_(false), tenant_id_(OB_INVALID_TENANT_ID), proxy_(NULL),
restore_proxy_(), last_report_ts_(OB_INVALID_TIMESTAMP), primary_is_avaliable_(true) {}
restore_proxy_(), last_report_ts_(OB_INVALID_TIMESTAMP), primary_is_avaliable_(true), restore_status_() {}
virtual ~ObRecoveryLSService() {}
int init();
void destroy();
int get_sys_restore_status(logservice::RestoreStatusInfo &restore_status);
virtual void do_work() override;
DEFINE_MTL_FUNC(ObRecoveryLSService)
private:
@ -94,7 +96,8 @@ private:
int process_ls_log_(const ObAllTenantInfo &tenant_info,
share::SCN &start_scn,
palf::PalfBufferIterator &iterator);
int process_upgrade_log_(const transaction::ObTxBufferNode &node);
int process_upgrade_log_(const share::SCN &sync_scn,
const transaction::ObTxBufferNode &node);
int process_gc_log_(logservice::ObGCLSLog &gc_log,
const share::SCN &syn_scn);
int process_ls_tx_log_(transaction::ObTxLogBlock &tx_log,
@ -129,6 +132,7 @@ private:
int process_ls_transfer_task_in_trans_(const transaction::ObTxBufferNode &node,
const share::SCN &sync_scn, common::ObMySQLTransaction &trans);
int init_restore_status(const share::SCN &sync_scn, int err_code);
//thread1
int do_standby_balance_();
int do_ls_balance_task_();
@ -147,6 +151,7 @@ private:
ObLogRestoreProxyUtil restore_proxy_;
int64_t last_report_ts_;
bool primary_is_avaliable_;
logservice::RestoreStatusInfo restore_status_;
};
}
}