Add restore_to_end interface

This commit is contained in:
obdev 2023-04-21 07:15:06 +00:00 committed by ob-robot
parent 9659dc0227
commit 27b208972f
4 changed files with 42 additions and 17 deletions

View File

@ -178,7 +178,7 @@ int ObLogRestoreHandler::get_max_restore_scn(SCN &scn) const
{
RLockGuard guard(lock_);
int ret = OB_SUCCESS;
if (OB_ISNULL(parent_) || ! parent_->to_end()) {
if (OB_ISNULL(parent_) || ! restore_to_end_unlock_()) {
ret = OB_EAGAIN;
} else {
parent_->get_end_scn(scn);
@ -325,7 +325,7 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
CLOG_LOG(WARN, "invalid argument", K(ret), K(proposal_id), K(lsn), K(buf), K(buf_size));
} else if (proposal_id != proposal_id_) {
CLOG_LOG(INFO, "stale task, just skip", K(proposal_id), K(proposal_id_), K(lsn), K(id_));
} else if (NULL == parent_ || parent_->to_end()) {
} else if (NULL == parent_ || restore_to_end_unlock_()) {
ret = OB_RESTORE_LOG_TO_END;
CLOG_LOG(INFO, "submit log to end, just skip", K(ret), K(lsn), KPC(this));
} else {
@ -417,7 +417,7 @@ int ObLogRestoreHandler::try_retire_task(ObFetchLogTask &task, bool &done)
CLOG_LOG(INFO, "restore max_lsn bigger than task end_lsn, just skip it", K(task), KPC(this));
done = true;
context_.issue_task_num_--;
} else if (parent_->to_end()) {
} else if (restore_to_end_unlock_()) {
// when restore is set to_end, issue_version_ is advanced, and all issued tasks before are stale tasks as stale issue_version_
CLOG_LOG(ERROR, "error unexpected, log restored to_end, just skip it", K(task), KPC(this), K(parent_));
done = true;
@ -440,7 +440,7 @@ int ObLogRestoreHandler::need_schedule(bool &need_schedule,
} else if (OB_SUCCESS != context_.error_context_.ret_code_) {
// error exist, no need schedule
} else {
need_schedule = is_strong_leader(role_) && ! parent_->to_end();
need_schedule = is_strong_leader(role_) && ! restore_to_end_unlock_();
proposal_id = proposal_id_;
context = context_;
}
@ -458,17 +458,23 @@ bool ObLogRestoreHandler::need_update_source() const
return is_strong_leader(role_);
}
void ObLogRestoreHandler::mark_error(share::ObTaskId &trace_id, const int ret_code)
void ObLogRestoreHandler::mark_error(share::ObTaskId &trace_id, const int ret_code, const palf::LSN &lsn)
{
int ret = OB_SUCCESS;
palf::LSN end_lsn;
WLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (! is_strong_leader(role_)) {
CLOG_LOG(INFO, "not leader, no need record error", K(id_), K(ret_code));
} else if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get end_lsn failed", K(id_));
} else if (end_lsn < lsn) {
CLOG_LOG(WARN, "end_lsn smaller than error lsn, just skip", K(id_), K(end_lsn), K(lsn), KPC(parent_), KPC(this));
} else if (OB_SUCCESS == context_.error_context_.ret_code_) {
context_.error_context_.ret_code_ = ret_code;
context_.error_context_.trace_id_.set(trace_id);
CLOG_LOG(ERROR, "fatal error occur in restore", KPC(parent_), KPC(this));
}
}
@ -674,7 +680,7 @@ int ObLogRestoreHandler::get_next_sorted_task(ObFetchLogTask *&task)
ret = OB_NOT_INIT;
} else if (! is_strong_leader(role_)) {
ret = OB_NOT_MASTER;
} else if (NULL != parent_ && parent_->to_end()) {
} else if (NULL != parent_ && restore_to_end_unlock_()) {
// if restore to end, free all cached tasks
ret = context_.reset_sorted_tasks();
} else if (context_.submit_array_.empty()) {
@ -734,5 +740,23 @@ int ObLogRestoreHandler::diagnose(RestoreDiagnoseInfo &diagnose_info)
return ret;
}
bool ObLogRestoreHandler::restore_to_end_unlock_() const
{
int ret = OB_SUCCESS;
bool bret = false;
share::SCN scn;
share::SCN recovery_end_scn;
if (NULL == parent_) {
bret = false;
} else if (parent_->to_end()) {
bret = true;
} else if (OB_FAIL(palf_handle_.get_end_scn(scn))) {
CLOG_LOG(WARN, "get end scn failed", K(id_));
} else {
parent_->get_upper_limit_scn(recovery_end_scn);
bret = scn >= recovery_end_scn;
}
return bret;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -151,7 +151,7 @@ public:
// return true only if in restore state and all replicas have restore and replay finish
int check_restore_done(const share::SCN &recovery_end_scn, bool &done);
// @brief set error if error occurs in log restore
void mark_error(share::ObTaskId &trace_id, const int ret_code);
void mark_error(share::ObTaskId &trace_id, const int ret_code, const palf::LSN &lsn);
// @brief get restore error for report
int get_restore_error(share::ObTaskId &trace_id, int &ret_code, bool &error_exist);
// @brief Before the standby tenant switchover to primary, check if all primary logs are restored in the standby
@ -182,6 +182,7 @@ private:
int check_replay_done_(const share::SCN &scn, bool &done);
int check_replica_replay_done_(const share::SCN &scn, common::ObMemberList &member_list, bool &done);
int check_member_list_change_(common::ObMemberList &member_list, bool &member_list_change);
bool restore_to_end_unlock_() const;
private:
ObRemoteLogParent *parent_;

View File

@ -268,6 +268,7 @@ int ObRemoteFetchWorker::handle_single_task_()
} else {
ObFetchLogTask *task = static_cast<ObFetchLogTask *>(data);
ObLSID id = task->id_;
palf::LSN cur_lsn = task->cur_lsn_;
// after task handle, DON'T print it any more
if (OB_FAIL(handle_fetch_log_task_(task))) {
LOG_WARN("handle fetch log task failed", K(ret), KP(task), K(id));
@ -275,7 +276,7 @@ int ObRemoteFetchWorker::handle_single_task_()
// only fatal error report fail, retry with others
if (is_fatal_error_(ret)) {
report_error_(id, ret);
report_error_(id, ret, cur_lsn);
}
}
return ret;
@ -300,14 +301,13 @@ int ObRemoteFetchWorker::handle_fetch_log_task_(ObFetchLogTask *task)
LOG_WARN("push submit array failed", K(ret));
}
// if fatal error encounter, just free task. Restore can only be recovered with new log_restore_source
if (is_fatal_error_(ret)) {
LOG_ERROR("free fetch log task with restore fatal error", K(ret), KPC(task));
inner_free_task_(*task);
} else if (OB_SUCC(ret) && ! empty) {
if (OB_SUCC(ret) && ! empty) {
// pre_read succ and push submit array succ, do nothing,
} else {
if (! empty && OB_FAIL(ret)) {
if (is_fatal_error_(ret)) {
// fatal error may be false positive, for example restore in parallel, the range in pre-read maybe surpass the current log archive round, which not needed.
LOG_WARN("fatal error occur", K(ret), KPC(task));
} else if (! empty && OB_FAIL(ret)) {
LOG_WARN("task data not empty and push submit array failed, try retire task", K(ret), KPC(task));
} else if (OB_SUCC(ret)) {
// pre_read data is empty, do notning
@ -568,11 +568,11 @@ bool ObRemoteFetchWorker::is_fatal_error_(const int ret_code) const
|| OB_ARCHIVE_LOG_RECYCLED == ret_code;
}
void ObRemoteFetchWorker::report_error_(const ObLSID &id, const int ret_code)
void ObRemoteFetchWorker::report_error_(const ObLSID &id, const int ret_code, const palf::LSN &lsn)
{
int ret = OB_SUCCESS;
GET_RESTORE_HANDLER_CTX(id) {
restore_handler->mark_error(*ObCurTraceId::get_trace_id(), ret_code);
restore_handler->mark_error(*ObCurTraceId::get_trace_id(), ret_code, lsn);
}
}

View File

@ -94,7 +94,7 @@ private:
bool is_retry_ret_code_(const int ret_code) const;
bool is_fatal_error_(const int ret_code) const;
void report_error_(const ObLSID &id, const int ret_code);
void report_error_(const ObLSID &id, const int ret_code, const palf::LSN &lsn);
private:
bool inited_;
uint64_t tenant_id_;