From f68351d6251f352b4594c852ec77e1492c166f8d Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 2 Aug 2023 11:12:57 +0000 Subject: [PATCH] [4.2][Standby]V$OB_LS_LOG_RESTORE_STATUS support primary upgrade and primary tenant dropped scenarios --- .../restoreservice/ob_log_restore_handler.cpp | 58 +++++++++-- .../restoreservice/ob_log_restore_handler.h | 7 +- .../ob_all_virtual_ls_log_restore_status.cpp | 39 ++++++-- src/rootserver/ob_recovery_ls_service.cpp | 96 ++++++++++++++++++- src/rootserver/ob_recovery_ls_service.h | 9 +- 5 files changed, 188 insertions(+), 21 deletions(-) diff --git a/src/logservice/restoreservice/ob_log_restore_handler.cpp b/src/logservice/restoreservice/ob_log_restore_handler.cpp index 178b97582a..fdb5eb0f16 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.cpp +++ b/src/logservice/restoreservice/ob_log_restore_handler.cpp @@ -58,7 +58,7 @@ const char *restore_comment_str[static_cast(RestoreSyncStatus::MAX_RESTORE_ "Log source is unreachable, the log source access point may be unavailable", "Fetch log time out", "Restore suspend, the standby has synchronized to recovery until scn", - "Standby binary version is lower than primary data version, standby need upgrade", + "Standby binary version is lower than primary data version, standby need to upgrade", "Primary tenant has been dropped", "Unexpected exceptions", }; @@ -927,8 +927,8 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s CLOG_LOG(WARN, "fail to get restore error"); } else if (error_exist) { CLOG_LOG(TRACE, "start to mark restore sync error", K(trace_id), K(ret_code), K(context_.error_context_.error_type_)); - if (OB_FAIL(get_err_code_and_message_(ret_code, context_.error_context_.error_type_, sync_status))) { - CLOG_LOG(WARN, "fail to get err code and message", K(ret_code), K(context_.error_context_.error_type_), K(sync_status)); + if (OB_FAIL(get_restore_sync_status(ret_code, context_.error_context_.error_type_, sync_status))) { + CLOG_LOG(WARN, "fail to get sync status", K(ret_code), K(context_.error_context_.error_type_), K(sync_status)); } else { restore_status_info.sync_status_ = sync_status; } @@ -944,8 +944,8 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s restore_status_info.err_code_ = ret_code; restore_status_info.sync_lsn_ = lsn.val_; restore_status_info.sync_scn_ = scn; - if (OB_FAIL(restore_status_info.comment_.assign_fmt("%s", restore_comment_str[int(restore_status_info.sync_status_)]))) { - CLOG_LOG(WARN, "fail to assign comment", K(sync_status)); + if (OB_FAIL(restore_status_info.get_restore_comment())) { + CLOG_LOG(WARN, "fail to get comment", K(sync_status)); } else { CLOG_LOG(TRACE, "success to get error code and message", K(restore_status_info)); } @@ -953,7 +953,7 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s return ret; } -int ObLogRestoreHandler::get_err_code_and_message_(int ret_code, +int ObLogRestoreHandler::get_restore_sync_status(int ret_code, ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &sync_status) { @@ -1014,6 +1014,27 @@ RestoreStatusInfo::RestoreStatusInfo() comment_.reset(); } +int RestoreStatusInfo::set(const share::ObLSID &ls_id, + const palf::LSN &lsn, const share::SCN &scn, int err_code, + const RestoreSyncStatus sync_status) +{ + int ret = OB_SUCCESS; + if (!ls_id.is_valid() || !lsn.is_valid() || !scn.is_valid() || OB_SUCCESS == err_code) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", KR(ret), K(ls_id), K(lsn), K(scn), K(err_code)); + } else { + ls_id_ = ls_id.id(); + sync_scn_= scn; + sync_lsn_ = lsn.val_; + sync_status_ = sync_status; + err_code_ = err_code; + if (OB_FAIL(get_restore_comment())) { + CLOG_LOG(WARN, "failed to assign comment", KR(ret), K(sync_status)); + } + } + return ret; +} + void RestoreStatusInfo::reset() { ls_id_ = share::ObLSID::INVALID_LS_ID; @@ -1040,6 +1061,31 @@ int RestoreStatusInfo::restore_sync_status_to_string(char *str_buf, const int64_ return ret; } +int RestoreStatusInfo::get_restore_comment() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(comment_.assign_fmt("%s", restore_comment_str[int(sync_status_)]))) { + CLOG_LOG(WARN, "fail to assign comment", K_(sync_status)); + } else { + CLOG_LOG(TRACE, "success to get restore status comment", K_(sync_status)); + } + return ret; +} + +int RestoreStatusInfo::assign(const RestoreStatusInfo &other) +{ + int ret = OB_SUCCESS; + ls_id_ = other.ls_id_; + sync_lsn_ = other.sync_lsn_; + sync_scn_ = other.sync_scn_; + sync_status_ = other.sync_status_; + err_code_ = other.err_code_; + if (OB_FAIL(comment_.assign(other.comment_))) { + CLOG_LOG(WARN, "fail to assign comment"); + } + return ret; +} + bool RestoreStatusInfo::is_valid() const { return ls_id_ != share::ObLSID::INVALID_LS_ID diff --git a/src/logservice/restoreservice/ob_log_restore_handler.h b/src/logservice/restoreservice/ob_log_restore_handler.h index 4e14d52b3c..a36a7e06ef 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.h +++ b/src/logservice/restoreservice/ob_log_restore_handler.h @@ -99,6 +99,11 @@ public: bool is_valid() const; void reset(); int restore_sync_status_to_string(char *str_buf, const int64_t str_len); + int set(const share::ObLSID &ls_id, + const palf::LSN &lsn, const share::SCN &scn, int err_code, + const RestoreSyncStatus sync_status); + int get_restore_comment(); + int assign(const RestoreStatusInfo &other); TO_STRING_KV(K_(ls_id), K_(sync_lsn), K_(sync_scn), K_(sync_status), K_(err_code), K_(comment)); public: @@ -230,6 +235,7 @@ public: int diagnose(RestoreDiagnoseInfo &diagnose_info) const; int refresh_error_context(); int get_ls_restore_status_info(RestoreStatusInfo &restore_status_info); + int get_restore_sync_status(int ret_code, const ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &sync_status); TO_STRING_KV(K_(is_inited), K_(is_in_stop_state), K_(id), K_(proposal_id), K_(role), KP_(parent), K_(context), K_(restore_context)); private: @@ -243,7 +249,6 @@ private: int check_restore_to_newest_from_archive_(ObLogArchivePieceContext &piece_context, const palf::LSN &end_lsn, const share::SCN &end_scn, share::SCN &archive_scn); bool restore_to_end_unlock_() const; - int get_err_code_and_message_(int ret_code, ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &err_type); private: ObRemoteLogParent *parent_; diff --git a/src/observer/virtual_table/ob_all_virtual_ls_log_restore_status.cpp b/src/observer/virtual_table/ob_all_virtual_ls_log_restore_status.cpp index ddffb6b9b6..17a806590a 100644 --- a/src/observer/virtual_table/ob_all_virtual_ls_log_restore_status.cpp +++ b/src/observer/virtual_table/ob_all_virtual_ls_log_restore_status.cpp @@ -22,6 +22,7 @@ #include "lib/mysqlclient/ob_mysql_proxy.h" #include "storage/tx_storage/ob_ls_map.h" // ObLSIterator #include "storage/tx_storage/ob_ls_service.h" // ObLSService +#include "rootserver/ob_recovery_ls_service.h" //ObLSRecoveryService using namespace oceanbase::share; @@ -67,12 +68,10 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row) ObLSService *ls_svr = MTL(ObLSService*); if (is_user_tenant(MTL_ID())) { if (OB_ISNULL(ls_svr)) { - ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "mtl ObLSService should not be null", K(ret)); } else if (OB_FAIL(ls_svr->get_ls_iter(guard, ObLSGetMod::LOG_MOD))) { SERVER_LOG(WARN, "get ls iter failed", K(ret)); } else if (OB_ISNULL(iter = guard.get_ptr())) { - ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "ls iter is NULL", K(ret), K(iter)); } else { while (OB_SUCC(ret)) { @@ -89,6 +88,7 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row) } else { SERVER_LOG(TRACE, "start to iterate this log_stream", K(MTL_ID()), K(ls->get_ls_id())); logservice::RestoreStatusInfo restore_status_info; + logservice::RestoreStatusInfo sys_restore_status_info; restore_handler = ls->get_log_restore_handler(); if (OB_ISNULL(restore_handler)) { SERVER_LOG(WARN, "restore handler is NULL", K(ret), K(ls)); @@ -96,11 +96,36 @@ int ObVirtualLSLogRestoreStatus::inner_get_next_row(common::ObNewRow *&row) SERVER_LOG(WARN, "fail to get ls restore status info"); } else if (!restore_status_info.is_valid()) { SERVER_LOG(WARN, "restore status info is invalid", K(restore_status_info)); - } else if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) { - SERVER_LOG(WARN, "fail to insert ls restore status info"); - } else { - SERVER_LOG(TRACE, "iterate this log_stream success"); - scanner_.add_row(cur_row_); + } else if (!ls->is_sys_ls()) { // not sys ls + if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) { + SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info)); + } else { + SERVER_LOG(TRACE, "iterate user log_stream success", K(ls)); + scanner_.add_row(cur_row_); + } + } else { // sys ls + rootserver::ObRecoveryLSService *ls_recovery_svr = MTL(rootserver::ObRecoveryLSService*); + if (OB_ISNULL(ls_recovery_svr)) { + SERVER_LOG(WARN, "ls recovery service is NULL", K(ret), K(ls)); + } else if (OB_FAIL(ls_recovery_svr->get_sys_restore_status(sys_restore_status_info))) { + SERVER_LOG(WARN, "get sys restore status failed", K(ls)); + // use restore_status_info if get sys ls restore status failed + if (OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) { + SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info)); + } else { + SERVER_LOG(TRACE, "insert ls restore status info success after get sys restore status failed", K(ls)); + scanner_.add_row(cur_row_); + } + } else if (sys_restore_status_info.is_valid() + && OB_FAIL(insert_ls_restore_status_info_(sys_restore_status_info))) { + SERVER_LOG(WARN, "fail to insert ls restore status info", K(sys_restore_status_info)); + } else if (!sys_restore_status_info.is_valid() + && OB_FAIL(insert_ls_restore_status_info_(restore_status_info))) { + SERVER_LOG(WARN, "fail to insert ls restore status info", K(restore_status_info)); + } else { + SERVER_LOG(TRACE, "iterate sys log_stream success", K(ls)); + scanner_.add_row(cur_row_); + } } } } // while diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 8e6e8a9d4a..bb149ebba2 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -93,6 +93,7 @@ int ObRecoveryLSService::init() tenant_id_ = MTL_ID(); proxy_ = GCTX.sql_proxy_; last_report_ts_ = OB_INVALID_TIMESTAMP; + restore_status_.reset(); inited_ = true; } @@ -109,6 +110,33 @@ void ObRecoveryLSService::destroy() restore_proxy_.destroy(); last_report_ts_ = OB_INVALID_TIMESTAMP; primary_is_avaliable_= true; + restore_status_.reset(); +} + +int ObRecoveryLSService::get_sys_restore_status(logservice::RestoreStatusInfo &restore_status) +{ + int ret = OB_SUCCESS; + palf::PalfHandleGuard palf_handle_guard; + palf::LSN report_lsn; + SCN report_scn; + if (has_set_stop()) { + ret = OB_IN_STOP_STATE; + LOG_WARN("thread is in stop state"); + } else if (!restore_status_.is_valid()) { + LOG_INFO("restore status is invalid", K(restore_status_)); + } else if (OB_FAIL(restore_status.assign(restore_status_))) { + LOG_WARN("restore status assign failed", K(restore_status_)); + } else if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) { + LOG_WARN("init palf handle guard failed"); + } else if (OB_FAIL(palf_handle_guard.get_end_lsn(report_lsn))) { + LOG_WARN("fail to get end lsn"); + } else if (OB_FAIL(palf_handle_guard.get_end_scn(report_scn))) { + LOG_WARN("fail to get end scn"); + } else { + restore_status.sync_lsn_ = report_lsn.val_; + restore_status.sync_scn_ = report_scn; + } + return ret; } void ObRecoveryLSService::do_work() @@ -149,6 +177,7 @@ void ObRecoveryLSService::do_work() LOG_WARN("tenant info is primary", KR(ret), K(tenant_info)); } else if (OB_FAIL(check_can_do_recovery_(tenant_id_))) { LOG_WARN("can not do recovery now", KR(ret), K(tenant_id_)); + restore_status_.reset(); } else if (0 == thread_idx) { idle_time_us = 10 * 1000 * 1000L; DEBUG_SYNC(STOP_RECOVERY_LS_THREAD0); @@ -203,10 +232,11 @@ void ObRecoveryLSService::do_work() } }//end thread1 LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret), - K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us)); + K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us), K_(restore_status)); idle(idle_time_us); ret = OB_SUCCESS; }//end while + restore_status_.reset(); } } @@ -297,6 +327,7 @@ int ObRecoveryLSService::process_ls_log_( LOG_WARN("SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log", KR(ret), K(sync_scn), K(tenant_info), K(log_entry), K(target_lsn), K(start_scn)); start_scn.reset(); + restore_status_.reset(); // need to reset restore status if iterate to recovery end } } else if (OB_FAIL(header.deserialize(log_buf, HEADER_SIZE, log_pos))) { LOG_WARN("failed to deserialize", KR(ret), K(HEADER_SIZE)); @@ -406,7 +437,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) { const ObTxBufferNode &node = source_data.at(i); if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) { - if (OB_FAIL(process_upgrade_log_(node))) { + if (OB_FAIL(process_upgrade_log_(sync_scn, node))) { LOG_WARN("failed to process_upgrade_log_", KR(ret), K(node)); } } else if (ObTxDataSourceType::LS_TABLE != node.get_data_source_type() @@ -442,14 +473,15 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC return ret; } -int ObRecoveryLSService::process_upgrade_log_(const ObTxBufferNode &node) +int ObRecoveryLSService::process_upgrade_log_( + const share::SCN &sync_scn, const ObTxBufferNode &node) { int ret = OB_SUCCESS; uint64_t standby_data_version = 0; - if (!node.is_valid()) { + if (!node.is_valid() || !sync_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("node is invalid", KR(ret), K(node)); + LOG_WARN("node is invalid", KR(ret), K(node), K(sync_scn)); } else { ObStandbyUpgrade primary_data_version; int64_t pos = 0; @@ -473,6 +505,10 @@ int ObRecoveryLSService::process_upgrade_log_(const ObTxBufferNode &node) LOG_ERROR("standby version is not new enough to recover primary clog", KR(ret), K(primary_data_version), K(standby_data_version)); } + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(init_restore_status(sync_scn, OB_ERR_RESTORE_STANDBY_VERSION_LAG))) { + LOG_WARN("failed to init restore status", KR(tmp_ret), K(sync_scn)); + } } } } @@ -713,6 +749,10 @@ int ObRecoveryLSService::process_ls_table_in_trans_(const transaction::ObTxBuffe if (share::is_ls_tenant_drop_pre_op(ls_attr.get_ls_operation_type())) { ret = OB_ITER_STOP; LOG_WARN("can not process ls operator after tenant dropping", K(ls_attr)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(init_restore_status(sync_scn, OB_ERR_RESTORE_PRIMARY_TENANT_DROPPED))) { + LOG_WARN("failed to init restore status", KR(tmp_ret), K(sync_scn), KR(tmp_ret)); + } } else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operation_type())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls recovery must stop while pre tenant dropping", KR(ret), K(ls_attr)); @@ -750,6 +790,7 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(const SCN &sync_scn) ret = OB_NEED_WAIT; LOG_WARN("can not process ls operator, need wait other ls sync", KR(ret), K(user_scn), K(sync_scn)); + restore_status_.reset(); } } @@ -968,6 +1009,10 @@ int ObRecoveryLSService::report_sys_ls_recovery_stat_in_trans_( K(ls_recovery_stat), K(tenant_info)); } else { last_report_ts_ = ObTimeUtility::current_time(); + if (!only_update_readable_scn) { + //如果汇报了sync_scn,需要把restore_status重置掉 + restore_status_.reset(); + } } CLICK(); FLOG_INFO("report sys ls recovery stat", KR(ret), K(sync_scn), K(only_update_readable_scn), @@ -1323,6 +1368,47 @@ int ObRecoveryLSService::update_source_inner_table_(char *buf, return ret; } +int ObRecoveryLSService::init_restore_status(const share::SCN &sync_scn, int err_code) +{ + int ret = OB_SUCCESS; + if (!sync_scn.is_valid() || OB_SUCCESS == err_code) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), KR(err_code), K(sync_scn)); + } else { + palf::LSN start_lsn(0); + ObSqlString sync_comment; + RestoreSyncStatus sync_status; + ObLSService *ls_svr = MTL(ObLSService *); + ObLSHandle ls_handle; + if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) { + LOG_WARN("failed to get ls", KR(ret)); + } else { + ObLogHandler *log_handler = NULL; + ObLogRestoreHandler *restore_handler = NULL; + ObLS *ls = NULL; + palf::LSN start_lsn; + if (OB_ISNULL(ls = ls_handle.get_ls()) || OB_ISNULL(log_handler = ls->get_log_handler())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls or log handle is null", KR(ret), KP(ls), KP(log_handler)); + } else if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) { + LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn)); + } else if (OB_ISNULL(restore_handler = ls->get_log_restore_handler())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get restore handler", KR(ret), K(sync_scn)); + } else if (OB_FAIL(restore_handler->get_restore_sync_status(err_code, + ObLogRestoreErrorContext::ErrorType::FETCH_LOG, + sync_status))) { + LOG_WARN("fail to get error code and message", KR(ret), K(sync_scn), K_(restore_status)); + } else if (OB_FAIL(restore_status_.set(SYS_LS, start_lsn, sync_scn, err_code, sync_status))) { + LOG_WARN("failed to init restore status", KR(ret), K(start_lsn), K(err_code), K(sync_scn)); + } else { + LOG_TRACE("init sys restore status success", K(restore_status_)); + } + } + } + return ret; +} + }//end of namespace rootserver }//end of namespace oceanbase diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index 1464e9d0fd..5eff671254 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -16,6 +16,7 @@ #include "logservice/ob_log_base_type.h"//ObIRoleChangeSubHandler ObICheckpointSubHandler ObIReplaySubHandler #include "logservice/palf/lsn.h"//palf::LSN #include "logservice/palf/palf_iterator.h" //PalfBufferIterator +#include "logservice/restoreservice/ob_log_restore_handler.h"//RestoreStatusInfo #include "ob_primary_ls_service.h" //ObTenantThreadHelper #include "lib/lock/ob_spin_lock.h" //ObSpinLock #include "storage/tx/ob_multi_data_source.h" //ObTxBufferNode @@ -78,10 +79,11 @@ class ObRecoveryLSService : public ObTenantThreadHelper { public: ObRecoveryLSService() : inited_(false), tenant_id_(OB_INVALID_TENANT_ID), proxy_(NULL), - restore_proxy_(), last_report_ts_(OB_INVALID_TIMESTAMP), primary_is_avaliable_(true) {} + restore_proxy_(), last_report_ts_(OB_INVALID_TIMESTAMP), primary_is_avaliable_(true), restore_status_() {} virtual ~ObRecoveryLSService() {} int init(); void destroy(); + int get_sys_restore_status(logservice::RestoreStatusInfo &restore_status); virtual void do_work() override; DEFINE_MTL_FUNC(ObRecoveryLSService) private: @@ -94,7 +96,8 @@ private: int process_ls_log_(const ObAllTenantInfo &tenant_info, share::SCN &start_scn, palf::PalfBufferIterator &iterator); - int process_upgrade_log_(const transaction::ObTxBufferNode &node); + int process_upgrade_log_(const share::SCN &sync_scn, + const transaction::ObTxBufferNode &node); int process_gc_log_(logservice::ObGCLSLog &gc_log, const share::SCN &syn_scn); int process_ls_tx_log_(transaction::ObTxLogBlock &tx_log, @@ -129,6 +132,7 @@ private: int process_ls_transfer_task_in_trans_(const transaction::ObTxBufferNode &node, const share::SCN &sync_scn, common::ObMySQLTransaction &trans); + int init_restore_status(const share::SCN &sync_scn, int err_code); //thread1 int do_standby_balance_(); int do_ls_balance_task_(); @@ -147,6 +151,7 @@ private: ObLogRestoreProxyUtil restore_proxy_; int64_t last_report_ts_; bool primary_is_avaliable_; + logservice::RestoreStatusInfo restore_status_; }; } }