[CP] fix sys ls report
This commit is contained in:
@ -146,9 +146,7 @@ void ObRecoveryLSService::do_work()
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(inited_), KP(proxy_));
|
||||
} else {
|
||||
ObLSRecoveryStatOperator ls_recovery;
|
||||
palf::PalfBufferIterator iterator;//can not use without palf_guard
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t idle_time_us = 100 * 1000L;
|
||||
SCN start_scn;
|
||||
last_report_ts_ = OB_INVALID_TIMESTAMP;
|
||||
@ -174,62 +172,16 @@ void ObRecoveryLSService::do_work()
|
||||
restore_status_.reset();
|
||||
} else if (0 == thread_idx) {
|
||||
idle_time_us = 10 * 1000 * 1000L;
|
||||
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD0);
|
||||
//adjust primary zone and balance ls group
|
||||
if (OB_TMP_FAIL(do_standby_balance_())) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("do standby balance", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(do_ls_balance_task_())) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("failed to process ls balance task", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
|
||||
(void)try_tenant_upgrade_end_();
|
||||
if (tenant_info.is_standby()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
(void)try_update_primary_ip_list();
|
||||
}
|
||||
if (OB_FAIL(process_thread0_(tenant_info))) {
|
||||
LOG_WARN("failed to process thread0", KR(ret));
|
||||
}
|
||||
} else {
|
||||
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD1);
|
||||
palf::PalfHandleGuard palf_handle_guard;
|
||||
if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) {
|
||||
LOG_WARN("failed to init palf handle guard", KR(ret));
|
||||
} else if (!start_scn.is_valid()) {
|
||||
ObLSRecoveryStat ls_recovery_stat;
|
||||
if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_,
|
||||
SYS_LS, false, ls_recovery_stat, *proxy_))) {
|
||||
LOG_WARN("failed to load sys recovery stat", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(report_sys_ls_recovery_stat_(ls_recovery_stat.get_sync_scn(), true,
|
||||
"report readable_scn while start_scn is invalid"))) {
|
||||
//may recovery end, but readable scn need report
|
||||
LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat));
|
||||
} else if (tenant_info.get_recovery_until_scn() <= ls_recovery_stat.get_sync_scn()) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { // every minute
|
||||
LOG_INFO("has recovered to recovery_until_scn", KR(ret), K(ls_recovery_stat), K(tenant_info));
|
||||
}
|
||||
} else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), palf_handle_guard, iterator))) {
|
||||
LOG_WARN("failed to seek log iterator", KR(ret), K(ls_recovery_stat));
|
||||
} else {
|
||||
start_scn = ls_recovery_stat.get_sync_scn();
|
||||
LOG_INFO("start to seek at", K(start_scn));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (start_scn.is_valid() && OB_FAIL(process_ls_log_(tenant_info, start_scn, iterator))) {
|
||||
if (OB_ITER_STOP != ret) {
|
||||
LOG_WARN("failed to process ls log", KR(ret), K(start_scn), K(tenant_info));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
start_scn.reset();
|
||||
if (OB_FAIL(process_thread1_(tenant_info, start_scn, iterator))) {
|
||||
LOG_WARN("failed to process thread1", KR(ret));
|
||||
}
|
||||
}//end thread1
|
||||
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), KR(tmp_ret),
|
||||
K(start_scn), K(thread_idx), K(tenant_info), K(idle_time_us), K_(restore_status));
|
||||
LOG_INFO("[LS_RECOVERY] finish one round", KR(ret), K(idle_time_us),
|
||||
K(start_scn), K(thread_idx), K(tenant_info), K_(restore_status));
|
||||
idle(idle_time_us);
|
||||
ret = OB_SUCCESS;
|
||||
}//end while
|
||||
@ -237,6 +189,80 @@ void ObRecoveryLSService::do_work()
|
||||
}
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::process_thread0_(const ObAllTenantInfo &tenant_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD0);
|
||||
//adjust primary zone and balance ls group
|
||||
if (OB_TMP_FAIL(do_standby_balance_())) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("do standby balance", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
if (OB_TMP_FAIL(do_ls_balance_task_())) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("failed to process ls balance task", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
(void)try_tenant_upgrade_end_();
|
||||
if (tenant_info.is_standby()) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
(void)try_update_primary_ip_list();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::process_thread1_(const ObAllTenantInfo &tenant_info,
|
||||
share::SCN &start_scn,
|
||||
palf::PalfBufferIterator &iterator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(STOP_RECOVERY_LS_THREAD1);
|
||||
if (OB_UNLIKELY(!tenant_info.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("tenant info is invalid", KR(ret), K(tenant_info));
|
||||
} else if (OB_UNLIKELY(!inited_) || OB_ISNULL(proxy_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(inited_), KP(proxy_));
|
||||
} else {
|
||||
palf::PalfHandleGuard palf_handle_guard;
|
||||
ObLSRecoveryStat ls_recovery_stat;
|
||||
ObLSRecoveryStatOperator ls_recovery;
|
||||
if (OB_FAIL(init_palf_handle_guard_(palf_handle_guard))) {
|
||||
LOG_WARN("failed to init palf handle guard", KR(ret));
|
||||
} else if (!start_scn.is_valid()) {
|
||||
if (OB_FAIL(ls_recovery.get_ls_recovery_stat(tenant_id_,
|
||||
SYS_LS, false, ls_recovery_stat, *proxy_))) {
|
||||
LOG_WARN("failed to load sys recovery stat", KR(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(report_sys_ls_recovery_stat_(ls_recovery_stat.get_sync_scn(), true,
|
||||
"report readable_scn while start_scn is invalid"))) {
|
||||
//may recovery end, but readable scn need report
|
||||
LOG_WARN("failed to report ls recovery stat", KR(ret), K(ls_recovery_stat));
|
||||
} else if (tenant_info.get_recovery_until_scn() <= ls_recovery_stat.get_sync_scn()) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { // every minute
|
||||
LOG_INFO("has recovered to recovery_until_scn", KR(ret), K(ls_recovery_stat), K(tenant_info));
|
||||
}
|
||||
} else if (OB_FAIL(seek_log_iterator_(ls_recovery_stat.get_sync_scn(), palf_handle_guard, iterator))) {
|
||||
LOG_WARN("failed to seek log iterator", KR(ret), K(ls_recovery_stat));
|
||||
} else {
|
||||
start_scn = ls_recovery_stat.get_sync_scn();
|
||||
LOG_INFO("start to seek at", K(start_scn));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (start_scn.is_valid() && OB_FAIL(process_ls_log_(tenant_info, start_scn, iterator))) {
|
||||
if (OB_ITER_STOP != ret) {
|
||||
LOG_WARN("failed to process ls log", KR(ret), K(start_scn), K(tenant_info));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
start_scn.reset();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRecoveryLSService::init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -326,6 +352,8 @@ int ObRecoveryLSService::process_ls_log_(
|
||||
start_scn.reset();
|
||||
restore_status_.reset(); // need to reset restore status if iterate to recovery end
|
||||
}
|
||||
//防止sync_scn - 1被汇报上去
|
||||
sync_scn.reset();
|
||||
} else if (OB_FAIL(header.deserialize(log_buf, HEADER_SIZE, log_pos))) {
|
||||
LOG_WARN("failed to deserialize", KR(ret), K(HEADER_SIZE));
|
||||
} else if (OB_UNLIKELY(log_pos >= log_length)) {
|
||||
@ -353,15 +381,22 @@ int ObRecoveryLSService::process_ls_log_(
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
last_sync_scn = sync_scn;
|
||||
} else if (sync_scn.is_valid()) {
|
||||
//如果本条日志不可用,至少汇报sync_scn - 1,
|
||||
//防止拉日志齐步走和系统日志流的汇报参考其他日志流出现的死锁问题
|
||||
last_sync_scn = SCN::scn_dec(sync_scn);
|
||||
}
|
||||
|
||||
if (last_sync_scn.is_valid() && (OB_FAIL(ret) || OB_INVALID_TIMESTAMP == last_report_ts_
|
||||
|| ObTimeUtility::current_time() - last_report_ts_ > 100 * 1000)) {
|
||||
//if ls_operator can not process, need to report last sync scn
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const char* comment = OB_SUCC(ret) ? "regular report when iterate log" :
|
||||
"report sync_scn -1 when not valid to process";
|
||||
if (OB_TMP_FAIL(report_sys_ls_recovery_stat_(last_sync_scn, false,
|
||||
"regular report when iterate log"))) {
|
||||
LOG_WARN("failed to report ls recovery stat", KR(ret), KR(tmp_ret), K(last_sync_scn));
|
||||
comment))) {
|
||||
LOG_WARN("failed to report ls recovery stat", KR(ret), KR(tmp_ret),
|
||||
K(last_sync_scn), K(comment));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,6 +87,10 @@ public:
|
||||
virtual void do_work() override;
|
||||
DEFINE_MTL_FUNC(ObRecoveryLSService)
|
||||
private:
|
||||
int process_thread0_(const ObAllTenantInfo &tenant_info);
|
||||
int process_thread1_(const ObAllTenantInfo &tenant_info,
|
||||
share::SCN &start_scn,
|
||||
palf::PalfBufferIterator &iterator);
|
||||
//get log iterator by start_scn
|
||||
//interface for thread0
|
||||
int init_palf_handle_guard_(palf::PalfHandleGuard &palf_handle_guard);
|
||||
@ -150,6 +154,7 @@ private:
|
||||
int do_update_restore_source_(ObRestoreSourceServiceAttr &old_attr, ObLogRestoreSourceMgr &restore_source_mgr);
|
||||
int update_source_inner_table_(char *buf, const int64_t buf_size, ObMySQLTransaction &trans, const ObLogRestoreSourceItem &item);
|
||||
int get_ls_all_replica_readable_scn_(const share::ObLSID &ls_id, share::SCN &reabable_scn);
|
||||
|
||||
private:
|
||||
bool inited_;
|
||||
uint64_t tenant_id_;
|
||||
|
||||
Reference in New Issue
Block a user