diff --git a/src/rootserver/ob_ls_recovery_reportor.cpp b/src/rootserver/ob_ls_recovery_reportor.cpp index 8639bffbe..ffacb9a04 100755 --- a/src/rootserver/ob_ls_recovery_reportor.cpp +++ b/src/rootserver/ob_ls_recovery_reportor.cpp @@ -125,6 +125,7 @@ void ObLSRecoveryReportor::run2() ObThreadCondGuard guard(get_cond()); const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); while (!stop_) { + ObCurTraceId::init(GCONF.self_addr_); if (OB_ISNULL(GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema service is empty", KR(ret)); diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 7fe0c809e..a5faea1c9 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -20,9 +20,10 @@ #include "logservice/ob_log_handler.h" //ObLogHandler #include "logservice/palf/log_entry.h" //LogEntry #include "logservice/palf/log_define.h" +#include "logservice/ob_log_service.h"//open_palf #include "share/scn.h"//SCN #include "logservice/ob_garbage_collector.h"//ObGCLSLog -#include "logservice/restoreservice/ob_log_restore_handler.h"//ObLogRestoreHandler +#include "logservice/palf_handle_guard.h"//ObPalfHandleGuard #include "observer/ob_server_struct.h" //GCTX #include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader #include "rootserver/ob_ls_recovery_reportor.h" //ObLSRecoveryReportor @@ -57,6 +58,7 @@ using namespace palf; namespace rootserver { +ERRSIM_POINT_DEF(ERRSIM_END_TRANS_ERROR); #define RESTORE_EVENT_ADD \ int ret_code = OB_SUCCESS; \ switch (ret) { \ @@ -112,12 +114,18 @@ void ObRecoveryLSService::do_work() } else { ObLSRecoveryStatOperator ls_recovery; palf::PalfBufferIterator iterator; + palf::PalfHandleGuard palf_handle_guard; int tmp_ret = OB_SUCCESS; int64_t idle_time_us = 100 * 1000L; SCN start_scn; - while (!has_set_stop()) { + uint64_t thread_idx = get_thread_idx(); + if (0 != thread_idx) { + if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) { + LOG_WARN("failed to init palf handle guard", KR(ret)); + } + } + while (!has_set_stop() && OB_SUCC(ret)) { ObCurTraceId::init(GCONF.self_addr_); - uint64_t thread_idx = get_thread_idx(); ObTenantInfoLoader *tenant_info_loader = MTL(ObTenantInfoLoader*); ObAllTenantInfo tenant_info; //two thread for seed log and recovery_ls_manager @@ -156,7 +164,6 @@ void ObRecoveryLSService::do_work() } } else { DEBUG_SYNC(STOP_RECOVERY_LS_THREAD1); - if (!start_scn.is_valid()) { ObLSRecoveryStat ls_recovery_stat; if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_, @@ -166,12 +173,12 @@ void ObRecoveryLSService::do_work() "report readable_scn while start_scn is invalid"))) { //may recovery end, but readable scn need report LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat)); - } else if (tenant_info.get_recovery_until_scn() == ls_recovery_stat.get_sync_scn()) { + } else if (tenant_info.get_recovery_until_scn() <= ls_recovery_stat.get_sync_scn()) { ret = OB_EAGAIN; if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { // every minute LOG_INFO("has recovered to recovery_until_scn", KR(ret), K(ls_recovery_stat), K(tenant_info)); } - } else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), iterator))) { + } else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), palf_handle_guard, iterator))) { LOG_WARN("failed to seek log iterator", KR(ret), K(ls_recovery_stat)); } else { start_scn = ls_recovery_stat.get_sync_scn(); @@ -187,54 +194,54 @@ void ObRecoveryLSService::do_work() if (OB_FAIL(ret)) { start_scn.reset(); } - - - } + }//end thread1 LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret), K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us)); idle(idle_time_us); - } + ret = OB_SUCCESS; + }//end while } } -int ObRecoveryLSService::seek_log_iterator_(const SCN &sync_scn, PalfBufferIterator &iterator) +int ObRecoveryLSService::init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard) { int ret = OB_SUCCESS; + palf_handle_guard.reset(); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret), K(inited_)); + } else { + logservice::ObLogService *log_service = MTL(logservice::ObLogService *); + if (OB_ISNULL(log_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("log service is null", KR(ret)); + } else if (OB_FAIL(log_service->open_palf(SYS_LS, palf_handle_guard))) { + LOG_WARN("ObLogService open_palf fail", KR(ret), K(tenant_id_)); + } + } + + return ret; +} + +int ObRecoveryLSService::seek_log_iterator_(const SCN &sync_scn, + palf::PalfHandleGuard &palf_handle_guard, + PalfBufferIterator &iterator) +{ + int ret = OB_SUCCESS; + palf::LSN start_lsn(0); if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K(inited_)); } else if (OB_UNLIKELY(!sync_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("sync scn is invalid", KR(ret), K(sync_scn)); - } else { - ObLSService *ls_svr = MTL(ObLSService *); - ObLSHandle ls_handle; - if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) { - LOG_WARN("failed to get ls", KR(ret)); - } else { - ObLogHandler *log_handler = NULL; - ObLS *ls = NULL; - palf::LSN start_lsn(0); - if (OB_ISNULL(ls = ls_handle.get_ls()) - || OB_ISNULL(log_handler = ls->get_log_handler())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls or log handle is null", KR(ret), KP(ls), - KP(log_handler)); - } else { - if (SCN::base_scn() == sync_scn) { - // start_lsn = 0; - } else { - if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) { - LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn)); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(log_handler->seek(start_lsn, iterator))) { - LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn), K(start_lsn)); - } - } - } + } else if (SCN::base_scn() == sync_scn) { + // start_lsn = 0; + } else if (OB_FAIL(palf_handle_guard.locate_by_scn_coarsely(sync_scn, start_lsn))) { + LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn)); + } + if (FAILEDx(palf_handle_guard.seek(start_lsn, iterator))) { + LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn), K(start_lsn)); } return ret; } @@ -278,10 +285,6 @@ int ObRecoveryLSService::process_ls_log_( "SYS log scn beyond recovery_until_scn"))) { LOG_WARN("failed to report_sys_ls_recovery_stat_", KR(ret), K(sync_scn), K(tenant_info), K(log_entry), K(target_lsn), K(start_scn)); - // SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log and reset start_scn - } else if (OB_FAIL(seek_log_iterator_(tenant_info.get_recovery_until_scn(), iterator))) { - LOG_WARN("failed to seek log iterator", KR(ret), K(sync_scn), K(tenant_info), - K(log_entry), K(target_lsn), K(start_scn)); } else { ret = OB_ITER_STOP; LOG_WARN("SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log", @@ -422,6 +425,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC "report recovery stat and has multi data source"))) { LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn)); } + ret = ERRSIM_END_TRANS_ERROR ? : ret; END_TRANSACTION(trans) } } diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index d11d73d33..2dfe0d01b 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -62,6 +62,10 @@ namespace transaction { class ObTxLogBlock; } +namespace palf +{ +class PalfHandleGuard; +} namespace rootserver { /*description: @@ -83,7 +87,9 @@ public: private: //get log iterator by start_scn //interface for thread0 + int init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard); int seek_log_iterator_(const share::SCN &syn_scn, + palf::PalfHandleGuard &palf_handle_guard, palf::PalfBufferIterator &iterator); int process_ls_log_(const ObAllTenantInfo &tenant_info, share::SCN &start_scn,