fix recovery core
This commit is contained in:
@ -125,6 +125,7 @@ void ObLSRecoveryReportor::run2()
|
|||||||
ObThreadCondGuard guard(get_cond());
|
ObThreadCondGuard guard(get_cond());
|
||||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||||
while (!stop_) {
|
while (!stop_) {
|
||||||
|
ObCurTraceId::init(GCONF.self_addr_);
|
||||||
if (OB_ISNULL(GCTX.schema_service_)) {
|
if (OB_ISNULL(GCTX.schema_service_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("schema service is empty", KR(ret));
|
LOG_WARN("schema service is empty", KR(ret));
|
||||||
|
|||||||
@ -20,9 +20,10 @@
|
|||||||
#include "logservice/ob_log_handler.h" //ObLogHandler
|
#include "logservice/ob_log_handler.h" //ObLogHandler
|
||||||
#include "logservice/palf/log_entry.h" //LogEntry
|
#include "logservice/palf/log_entry.h" //LogEntry
|
||||||
#include "logservice/palf/log_define.h"
|
#include "logservice/palf/log_define.h"
|
||||||
|
#include "logservice/ob_log_service.h"//open_palf
|
||||||
#include "share/scn.h"//SCN
|
#include "share/scn.h"//SCN
|
||||||
#include "logservice/ob_garbage_collector.h"//ObGCLSLog
|
#include "logservice/ob_garbage_collector.h"//ObGCLSLog
|
||||||
#include "logservice/restoreservice/ob_log_restore_handler.h"//ObLogRestoreHandler
|
#include "logservice/palf_handle_guard.h"//ObPalfHandleGuard
|
||||||
#include "observer/ob_server_struct.h" //GCTX
|
#include "observer/ob_server_struct.h" //GCTX
|
||||||
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
|
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
|
||||||
#include "rootserver/ob_ls_recovery_reportor.h" //ObLSRecoveryReportor
|
#include "rootserver/ob_ls_recovery_reportor.h" //ObLSRecoveryReportor
|
||||||
@ -57,6 +58,7 @@ using namespace palf;
|
|||||||
namespace rootserver
|
namespace rootserver
|
||||||
{
|
{
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_END_TRANS_ERROR);
|
||||||
#define RESTORE_EVENT_ADD \
|
#define RESTORE_EVENT_ADD \
|
||||||
int ret_code = OB_SUCCESS; \
|
int ret_code = OB_SUCCESS; \
|
||||||
switch (ret) { \
|
switch (ret) { \
|
||||||
@ -112,12 +114,18 @@ void ObRecoveryLSService::do_work()
|
|||||||
} else {
|
} else {
|
||||||
ObLSRecoveryStatOperator ls_recovery;
|
ObLSRecoveryStatOperator ls_recovery;
|
||||||
palf::PalfBufferIterator iterator;
|
palf::PalfBufferIterator iterator;
|
||||||
|
palf::PalfHandleGuard palf_handle_guard;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
int64_t idle_time_us = 100 * 1000L;
|
int64_t idle_time_us = 100 * 1000L;
|
||||||
SCN start_scn;
|
SCN start_scn;
|
||||||
while (!has_set_stop()) {
|
uint64_t thread_idx = get_thread_idx();
|
||||||
|
if (0 != thread_idx) {
|
||||||
|
if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) {
|
||||||
|
LOG_WARN("failed to init palf handle guard", KR(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (!has_set_stop() && OB_SUCC(ret)) {
|
||||||
ObCurTraceId::init(GCONF.self_addr_);
|
ObCurTraceId::init(GCONF.self_addr_);
|
||||||
uint64_t thread_idx = get_thread_idx();
|
|
||||||
ObTenantInfoLoader *tenant_info_loader = MTL(ObTenantInfoLoader*);
|
ObTenantInfoLoader *tenant_info_loader = MTL(ObTenantInfoLoader*);
|
||||||
ObAllTenantInfo tenant_info;
|
ObAllTenantInfo tenant_info;
|
||||||
//two thread for seed log and recovery_ls_manager
|
//two thread for seed log and recovery_ls_manager
|
||||||
@ -156,7 +164,6 @@ void ObRecoveryLSService::do_work()
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD1);
|
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD1);
|
||||||
|
|
||||||
if (!start_scn.is_valid()) {
|
if (!start_scn.is_valid()) {
|
||||||
ObLSRecoveryStat ls_recovery_stat;
|
ObLSRecoveryStat ls_recovery_stat;
|
||||||
if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_,
|
if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_,
|
||||||
@ -166,12 +173,12 @@ void ObRecoveryLSService::do_work()
|
|||||||
"report readable_scn while start_scn is invalid"))) {
|
"report readable_scn while start_scn is invalid"))) {
|
||||||
//may recovery end, but readable scn need report
|
//may recovery end, but readable scn need report
|
||||||
LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat));
|
LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat));
|
||||||
} else if (tenant_info.get_recovery_until_scn() == ls_recovery_stat.get_sync_scn()) {
|
} else if (tenant_info.get_recovery_until_scn() <= ls_recovery_stat.get_sync_scn()) {
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { // every minute
|
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { // every minute
|
||||||
LOG_INFO("has recovered to recovery_until_scn", KR(ret), K(ls_recovery_stat), K(tenant_info));
|
LOG_INFO("has recovered to recovery_until_scn", KR(ret), K(ls_recovery_stat), K(tenant_info));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), iterator))) {
|
} else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), palf_handle_guard, iterator))) {
|
||||||
LOG_WARN("failed to seek log iterator", KR(ret), K(ls_recovery_stat));
|
LOG_WARN("failed to seek log iterator", KR(ret), K(ls_recovery_stat));
|
||||||
} else {
|
} else {
|
||||||
start_scn = ls_recovery_stat.get_sync_scn();
|
start_scn = ls_recovery_stat.get_sync_scn();
|
||||||
@ -187,54 +194,54 @@ void ObRecoveryLSService::do_work()
|
|||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
start_scn.reset();
|
start_scn.reset();
|
||||||
}
|
}
|
||||||
|
}//end thread1
|
||||||
|
|
||||||
}
|
|
||||||
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret),
|
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret),
|
||||||
K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us));
|
K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us));
|
||||||
idle(idle_time_us);
|
idle(idle_time_us);
|
||||||
}
|
ret = OB_SUCCESS;
|
||||||
|
}//end while
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObRecoveryLSService::seek_log_iterator_(const SCN &sync_scn, PalfBufferIterator &iterator)
|
int ObRecoveryLSService::init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
palf_handle_guard.reset();
|
||||||
|
if (OB_UNLIKELY(!inited_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("not init", KR(ret), K(inited_));
|
||||||
|
} else {
|
||||||
|
logservice::ObLogService *log_service = MTL(logservice::ObLogService *);
|
||||||
|
if (OB_ISNULL(log_service)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("log service is null", KR(ret));
|
||||||
|
} else if (OB_FAIL(log_service->open_palf(SYS_LS, palf_handle_guard))) {
|
||||||
|
LOG_WARN("ObLogService open_palf fail", KR(ret), K(tenant_id_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObRecoveryLSService::seek_log_iterator_(const SCN &sync_scn,
|
||||||
|
palf::PalfHandleGuard &palf_handle_guard,
|
||||||
|
PalfBufferIterator &iterator)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
palf::LSN start_lsn(0);
|
||||||
if (OB_UNLIKELY(!inited_)) {
|
if (OB_UNLIKELY(!inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", KR(ret), K(inited_));
|
LOG_WARN("not init", KR(ret), K(inited_));
|
||||||
} else if (OB_UNLIKELY(!sync_scn.is_valid())) {
|
} else if (OB_UNLIKELY(!sync_scn.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("sync scn is invalid", KR(ret), K(sync_scn));
|
LOG_WARN("sync scn is invalid", KR(ret), K(sync_scn));
|
||||||
} else {
|
} else if (SCN::base_scn() == sync_scn) {
|
||||||
ObLSService *ls_svr = MTL(ObLSService *);
|
// start_lsn = 0;
|
||||||
ObLSHandle ls_handle;
|
} else if (OB_FAIL(palf_handle_guard.locate_by_scn_coarsely(sync_scn, start_lsn))) {
|
||||||
if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) {
|
LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn));
|
||||||
LOG_WARN("failed to get ls", KR(ret));
|
}
|
||||||
} else {
|
if (FAILEDx(palf_handle_guard.seek(start_lsn, iterator))) {
|
||||||
ObLogHandler *log_handler = NULL;
|
LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn), K(start_lsn));
|
||||||
ObLS *ls = NULL;
|
|
||||||
palf::LSN start_lsn(0);
|
|
||||||
if (OB_ISNULL(ls = ls_handle.get_ls())
|
|
||||||
|| OB_ISNULL(log_handler = ls->get_log_handler())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("ls or log handle is null", KR(ret), KP(ls),
|
|
||||||
KP(log_handler));
|
|
||||||
} else {
|
|
||||||
if (SCN::base_scn() == sync_scn) {
|
|
||||||
// start_lsn = 0;
|
|
||||||
} else {
|
|
||||||
if (OB_FAIL(log_handler->locate_by_scn_coarsely(sync_scn, start_lsn))) {
|
|
||||||
LOG_WARN("failed to locate lsn", KR(ret), K(sync_scn));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
|
||||||
} else if (OB_FAIL(log_handler->seek(start_lsn, iterator))) {
|
|
||||||
LOG_WARN("failed to seek iterator", KR(ret), K(sync_scn), K(start_lsn));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -278,10 +285,6 @@ int ObRecoveryLSService::process_ls_log_(
|
|||||||
"SYS log scn beyond recovery_until_scn"))) {
|
"SYS log scn beyond recovery_until_scn"))) {
|
||||||
LOG_WARN("failed to report_sys_ls_recovery_stat_", KR(ret), K(sync_scn), K(tenant_info),
|
LOG_WARN("failed to report_sys_ls_recovery_stat_", KR(ret), K(sync_scn), K(tenant_info),
|
||||||
K(log_entry), K(target_lsn), K(start_scn));
|
K(log_entry), K(target_lsn), K(start_scn));
|
||||||
// SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log and reset start_scn
|
|
||||||
} else if (OB_FAIL(seek_log_iterator_(tenant_info.get_recovery_until_scn(), iterator))) {
|
|
||||||
LOG_WARN("failed to seek log iterator", KR(ret), K(sync_scn), K(tenant_info),
|
|
||||||
K(log_entry), K(target_lsn), K(start_scn));
|
|
||||||
} else {
|
} else {
|
||||||
ret = OB_ITER_STOP;
|
ret = OB_ITER_STOP;
|
||||||
LOG_WARN("SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log",
|
LOG_WARN("SYS LS has recovered to the recovery_until_scn, need stop iterate SYS LS log",
|
||||||
@ -422,6 +425,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
|
|||||||
"report recovery stat and has multi data source"))) {
|
"report recovery stat and has multi data source"))) {
|
||||||
LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn));
|
LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn));
|
||||||
}
|
}
|
||||||
|
ret = ERRSIM_END_TRANS_ERROR ? : ret;
|
||||||
END_TRANSACTION(trans)
|
END_TRANSACTION(trans)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -62,6 +62,10 @@ namespace transaction
|
|||||||
{
|
{
|
||||||
class ObTxLogBlock;
|
class ObTxLogBlock;
|
||||||
}
|
}
|
||||||
|
namespace palf
|
||||||
|
{
|
||||||
|
class PalfHandleGuard;
|
||||||
|
}
|
||||||
namespace rootserver
|
namespace rootserver
|
||||||
{
|
{
|
||||||
/*description:
|
/*description:
|
||||||
@ -83,7 +87,9 @@ public:
|
|||||||
private:
|
private:
|
||||||
//get log iterator by start_scn
|
//get log iterator by start_scn
|
||||||
//interface for thread0
|
//interface for thread0
|
||||||
|
int init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard);
|
||||||
int seek_log_iterator_(const share::SCN &syn_scn,
|
int seek_log_iterator_(const share::SCN &syn_scn,
|
||||||
|
palf::PalfHandleGuard &palf_handle_guard,
|
||||||
palf::PalfBufferIterator &iterator);
|
palf::PalfBufferIterator &iterator);
|
||||||
int process_ls_log_(const ObAllTenantInfo &tenant_info,
|
int process_ls_log_(const ObAllTenantInfo &tenant_info,
|
||||||
share::SCN &start_scn,
|
share::SCN &start_scn,
|
||||||
|
|||||||
Reference in New Issue
Block a user