Add issue_task_version in log restore

This commit is contained in:
obdev 2023-02-14 08:14:30 +00:00 committed by ob-robot
parent f6009fdcb4
commit e36fe6b9c8
11 changed files with 76 additions and 26 deletions

View File

@ -76,9 +76,11 @@ ObFetchLogTask::ObFetchLogTask(const share::ObLSID &id,
const SCN &pre_scn,
const palf::LSN &lsn,
const int64_t size,
const int64_t proposal_id) :
const int64_t proposal_id,
const int64_t version) :
id_(id),
proposal_id_(proposal_id),
version_(version),
start_lsn_(lsn),
cur_lsn_(lsn),
end_lsn_(lsn + size),
@ -93,6 +95,7 @@ bool ObFetchLogTask::is_valid() const
{
return id_.is_valid()
&& proposal_id_ > 0
&& version_ > 0
&& pre_scn_.is_valid()
&& start_lsn_.is_valid()
&& cur_lsn_.is_valid()
@ -104,6 +107,7 @@ void ObFetchLogTask::reset()
{
id_.reset();
proposal_id_ = 0;
version_ = 0;
pre_scn_.reset();
start_lsn_.reset();
cur_lsn_.reset();

View File

@ -35,18 +35,20 @@ public:
const share::SCN &pre_scn,
const palf::LSN &lsn,
const int64_t size,
const int64_t proposal_id);
const int64_t proposal_id,
const int64_t version);
~ObFetchLogTask() { reset(); }
bool is_valid() const;
void reset();
TO_STRING_KV(K_(id), K_(proposal_id), K_(pre_scn), K_(start_lsn), K_(cur_lsn),
K_(end_lsn), K_(max_fetch_scn), K_(max_submit_scn), K_(iter));
TO_STRING_KV(K_(id), K_(proposal_id), K_(version), K_(pre_scn), K_(start_lsn),
K_(cur_lsn), K_(end_lsn), K_(max_fetch_scn), K_(max_submit_scn), K_(iter));
public:
share::ObLSID id_;
// to distinguish stale tasks which is generated in previous leader
int64_t proposal_id_;
int64_t version_;
share::SCN pre_scn_; // heuristic log scn to locate piece, may be imprecise one
palf::LSN start_lsn_;
palf::LSN cur_lsn_;

View File

@ -56,8 +56,8 @@ void ObLogArchivePieceContext::RoundContext::reset()
bool ObLogArchivePieceContext::RoundContext::is_valid() const
{
return ((RoundContext::State::ACTIVE == state_ && end_scn_ == SCN::max_scn())
|| (RoundContext::State::STOP == state_ && end_scn_ > start_scn_))
return ((RoundContext::State::ACTIVE == state_ && min_piece_id_ > 0 && start_scn_ != SCN::max_scn())
|| (RoundContext::State::STOP == state_ && end_scn_ != SCN::max_scn() && end_scn_ > start_scn_))
&& round_id_ > 0
&& start_scn_.is_valid()
&& base_piece_id_ > 0

View File

@ -186,10 +186,13 @@ int ObLogRestoreHandler::add_source(logservice::DirArray &array, const SCN &end_
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObRemoteRawPathParent *source = static_cast<ObRemoteRawPathParent *>(parent_);
const bool source_exist = source->is_valid();
if (OB_FAIL(source->set(array, end_scn))) {
CLOG_LOG(WARN, "ObRemoteRawPathParent set failed", K(ret), K(array), K(end_scn));
ObResSrcAlloctor::free(parent_);
parent_ = NULL;
} else if (! source_exist) {
context_.set_issue_version();
}
}
return ret;;
@ -212,10 +215,13 @@ int ObLogRestoreHandler::add_source(share::ObBackupDest &dest, const SCN &end_sc
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObRemoteLocationParent *source = static_cast<ObRemoteLocationParent *>(parent_);
const bool source_exist = source->is_valid();
if (OB_FAIL(source->set(dest, end_scn))) {
CLOG_LOG(WARN, "ObRemoteLocationParent set failed", K(ret), K(end_scn), K(dest));
ObResSrcAlloctor::free(parent_);
parent_ = NULL;
} else if (! source_exist) {
context_.set_issue_version();
}
}
return ret;
@ -238,11 +244,14 @@ int ObLogRestoreHandler::add_source(const common::ObAddr &addr, const SCN &end_s
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObRemoteSerivceParent *source = static_cast<ObRemoteSerivceParent *>(parent_);
const bool source_exist = source->is_valid();
if (OB_FAIL(source->set(addr, end_scn))) {
CLOG_LOG(WARN, "ObRemoteSerivceParent set failed",
K(ret), K(end_scn), K(addr), KPC(this));
ObResSrcAlloctor::free(parent_);
parent_ = NULL;
} else if (! source_exist) {
context_.set_issue_version();
}
}
return ret;
@ -261,6 +270,7 @@ int ObLogRestoreHandler::clean_source()
CLOG_LOG(INFO, "log_restore_source is empty, clean source", KPC(parent_));
ObResSrcAlloctor::free(parent_);
parent_ = NULL;
context_.reset();
}
return ret;
}
@ -340,6 +350,7 @@ void ObLogRestoreHandler::deep_copy_source(ObRemoteSourceGuard &source_guard)
int ObLogRestoreHandler::schedule(const int64_t id,
const int64_t proposal_id,
const int64_t version,
const LSN &lsn,
bool &scheduled)
{
@ -350,7 +361,7 @@ int ObLogRestoreHandler::schedule(const int64_t id,
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(! lsn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
} else if (id != id_ || proposal_id != proposal_id_) {
} else if (id != id_ || proposal_id != proposal_id_ || version != context_.issue_version_) {
// stale task
} else {
scheduled = true;
@ -365,23 +376,24 @@ int ObLogRestoreHandler::try_retire_task(ObFetchLogTask &task, bool &done)
done = false;
int ret = OB_SUCCESS;
WLockGuard guard(lock_);
if (OB_UNLIKELY(!task.is_valid())) {
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(!task.is_valid() || task.id_.id() != id_)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(ret), K(task));
} else if (! is_strong_leader(role_) || NULL == parent_) {
done = true;
CLOG_LOG(INFO, "ls not leader, stale task, just skip it", K(task), K(role_));
} else if (OB_UNLIKELY(task.id_.id() != id_ || task.proposal_id_ != proposal_id_)) {
CLOG_LOG(INFO, "ls not leader or source is NULL, stale task, just skip it", K(task), K(role_));
} else if (OB_UNLIKELY(task.proposal_id_ != proposal_id_
|| task.version_ != context_.issue_version_)) {
done = true;
CLOG_LOG(INFO, "stale task, just skip it", K(task), KPC(this));
} else if (context_.max_fetch_lsn_ >= task.end_lsn_ || parent_->to_end()) {
done = true;
context_.issue_task_num_--;
} else if (context_.max_fetch_lsn_ >= task.start_lsn_) {
task.cur_lsn_ = context_.max_fetch_lsn_;
}
if (done) {
context_.issue_task_num_--;
}
return ret;
}

View File

@ -116,7 +116,11 @@ public:
// @brief the big range of log is separated into small tasks, so as to do parallel,
// concise dispatch is needed, here is to check if new task is in turn
int need_schedule(bool &need_schedule, int64_t &proposal_id, ObRemoteFetchContext &context) const;
int schedule(const int64_t id, const int64_t proposal_id, const LSN &lsn, bool &scheduled);
int schedule(const int64_t id,
const int64_t proposal_id,
const int64_t version,
const LSN &lsn,
bool &scheduled);
// @brief try retire fetch log task
// @param[in] ObFetchLogTask &, the remote fetch log task
// @param[out] bool &, the remote fetch log task is to end or not, retire it if true

View File

@ -11,6 +11,10 @@
*/
#include "ob_remote_fetch_context.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/time/ob_time_utility.h"
#include "lib/utility/ob_macro_utils.h"
namespace oceanbase
{
@ -20,6 +24,7 @@ namespace logservice
ObRemoteFetchContext &ObRemoteFetchContext::operator=(const ObRemoteFetchContext &other)
{
issue_task_num_ = other.issue_task_num_;
issue_version_ = other.issue_version_;
last_fetch_ts_ = other.last_fetch_ts_;
max_submit_lsn_ = other.max_submit_lsn_;
max_fetch_lsn_ = other.max_fetch_lsn_;
@ -31,6 +36,7 @@ ObRemoteFetchContext &ObRemoteFetchContext::operator=(const ObRemoteFetchContext
void ObRemoteFetchContext::reset()
{
issue_task_num_ = 0;
issue_version_ = OB_INVALID_TIMESTAMP;
last_fetch_ts_ = OB_INVALID_TIMESTAMP;
max_submit_lsn_.reset();
max_fetch_lsn_.reset();
@ -55,5 +61,15 @@ int ObRemoteFetchContext::reset_sorted_tasks()
return ret;
}
void ObRemoteFetchContext::set_issue_version()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TIMESTAMP != issue_version_)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "issue_version is valid", KPC(this));
} else {
issue_version_ = common::ObTimeUtility::current_time_ns();
}
}
} // namespace logservice
} // namespace oceanbase

View File

@ -28,6 +28,7 @@ namespace logservice
struct ObRemoteFetchContext
{
int64_t issue_task_num_;
int64_t issue_version_;
int64_t last_fetch_ts_; // 最后一次拉日志时间
palf::LSN max_submit_lsn_; // 提交远程日志拉取任务最大LSN
palf::LSN max_fetch_lsn_; // 拉到最后一条日志end_lsn
@ -40,8 +41,10 @@ struct ObRemoteFetchContext
ObRemoteFetchContext &operator=(const ObRemoteFetchContext &other);
void reset();
int reset_sorted_tasks();
TO_STRING_KV(K_(issue_task_num), K_(last_fetch_ts), K_(max_submit_lsn), K_(max_fetch_lsn),
K_(max_fetch_scn), K_(error_context), "task_count", submit_array_.count());
void set_issue_version();
TO_STRING_KV(K_(issue_task_num), K_(issue_version), K_(last_fetch_ts),
K_(max_submit_lsn), K_(max_fetch_lsn), K_(max_fetch_scn),
K_(error_context), "task_count", submit_array_.count());
};
} // namespace logservice
} // namespace oceanbase

View File

@ -125,6 +125,7 @@ int ObRemoteFetchLogImpl::do_fetch_log_(ObLS &ls)
bool can_fetch_log = false;
bool need_schedule = false;
int64_t proposal_id = -1;
int64_t version = 0;
LSN lsn;
SCN pre_scn; // scn to locate piece
LSN max_fetch_lsn;
@ -138,13 +139,13 @@ int ObRemoteFetchLogImpl::do_fetch_log_(ObLS &ls)
LOG_WARN("check replica status failed", K(ret), K(ls));
} else if (! can_fetch_log) {
// just skip
} else if (OB_FAIL(check_need_schedule_(ls, need_schedule,
proposal_id, max_fetch_lsn, last_fetch_ts, task_count))) {
} else if (OB_FAIL(check_need_schedule_(ls, need_schedule, proposal_id,
version, max_fetch_lsn, last_fetch_ts, task_count))) {
LOG_WARN("check need schedule failed", K(ret), K(id));
} else if (! need_schedule) {
} else if (OB_FAIL(get_fetch_log_base_lsn_(ls, max_fetch_lsn, last_fetch_ts, pre_scn, lsn))) {
LOG_WARN("get fetch log base lsn failed", K(ret), K(id));
} else if (OB_FAIL(submit_fetch_log_task_(ls, pre_scn, lsn, task_count, proposal_id))) {
} else if (OB_FAIL(submit_fetch_log_task_(ls, pre_scn, lsn, task_count, proposal_id, version))) {
LOG_WARN("submit fetch log task failed", K(ret), K(id), K(lsn), K(task_count));
} else {
LOG_TRACE("do fetch log succ", K(id), K(lsn), K(task_count));
@ -169,6 +170,7 @@ int ObRemoteFetchLogImpl::check_replica_status_(ObLS &ls, bool &can_fetch_log)
int ObRemoteFetchLogImpl::check_need_schedule_(ObLS &ls,
bool &need_schedule,
int64_t &proposal_id,
int64_t &version,
LSN &lsn,
int64_t &last_fetch_ts,
int64_t &task_count)
@ -195,6 +197,7 @@ int ObRemoteFetchLogImpl::check_need_schedule_(ObLS &ls,
} else if (need_delay) {
need_schedule = false;
} else {
version = context.issue_version_;
lsn = context.max_submit_lsn_;
last_fetch_ts = context.last_fetch_ts_;
task_count = concurrency - context.issue_task_num_;
@ -264,7 +267,8 @@ int ObRemoteFetchLogImpl::submit_fetch_log_task_(ObLS &ls,
const SCN &scn,
const LSN &lsn,
const int64_t task_count,
const int64_t proposal_id)
const int64_t proposal_id,
const int64_t version)
{
int ret = OB_SUCCESS;
LSN start_lsn = lsn;
@ -275,7 +279,7 @@ int ObRemoteFetchLogImpl::submit_fetch_log_task_(ObLS &ls,
const LSN end_lsn = LSN((start_lsn.val_ / max_size + 1) * max_size);
const int64_t size = static_cast<int64_t>(end_lsn - start_lsn);
scheduled = false;
if (OB_FAIL(do_submit_fetch_log_task_(ls, scn, start_lsn, size, proposal_id, scheduled))) {
if (OB_FAIL(do_submit_fetch_log_task_(ls, scn, start_lsn, size, proposal_id, version, scheduled))) {
LOG_WARN("do submit fetch log task failed", K(ret), K(ls));
} else if (! scheduled) {
break;
@ -291,6 +295,7 @@ int ObRemoteFetchLogImpl::do_submit_fetch_log_task_(ObLS &ls,
const LSN &lsn,
const int64_t size,
const int64_t proposal_id,
const int64_t version,
bool &scheduled)
{
int ret = OB_SUCCESS;
@ -302,12 +307,12 @@ int ObRemoteFetchLogImpl::do_submit_fetch_log_task_(ObLS &ls,
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(id));
} else {
task = new (data) ObFetchLogTask(id, scn, lsn, size, proposal_id);
task = new (data) ObFetchLogTask(id, scn, lsn, size, proposal_id, version);
ObLogRestoreHandler *restore_handler = NULL;
if (OB_ISNULL(restore_handler = ls.get_log_restore_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get restore_handler failed", K(ret), K(ls));
} else if (OB_FAIL(restore_handler->schedule(id.id(), proposal_id, lsn + size, scheduled))) {
} else if (OB_FAIL(restore_handler->schedule(id.id(), proposal_id, version, lsn + size, scheduled))) {
LOG_WARN("schedule failed", K(ret), K(ls));
} else if (! scheduled) {
// not scheduled

View File

@ -61,12 +61,14 @@ private:
int do_fetch_log_(ObLS &ls);
int check_replica_status_(ObLS &ls, bool &can_fetch_log);
int check_need_schedule_(ObLS &ls, bool &need_schedule, int64_t &proposal_id,
LSN &lsn, int64_t &last_fetch_ts, int64_t &task_count);
int64_t &version, LSN &lsn, int64_t &last_fetch_ts, int64_t &task_count);
int check_need_delay_(const ObLSID &id, bool &need_delay);
int get_fetch_log_base_lsn_(ObLS &ls, const LSN &max_fetch_lsn, const int64_t last_fetch_ts, share::SCN &scn, LSN &lsn);
int get_palf_base_lsn_scn_(ObLS &ls, LSN &lsn, share::SCN &scn);
int submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t task_count, const int64_t proposal_id);
int do_submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t size, const int64_t proposal_id, bool &scheduled);
int submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn,
const int64_t task_count, const int64_t proposal_id, const int64_t version);
int do_submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t size,
const int64_t proposal_id, const int64_t version, bool &scheduled);
private:
bool inited_;

View File

@ -239,6 +239,7 @@ void ObRemoteFetchWorker::do_thread_task_()
int ObRemoteFetchWorker::handle_single_task_()
{
DEBUG_SYNC(BEFORE_RESTORE_HANDLE_FETCH_LOG_TASK);
int ret = OB_SUCCESS;
void *data = NULL;
if (OB_FAIL(task_queue_.pop(data))) {

View File

@ -448,6 +448,7 @@ class ObString;
ACT(BEFORE_BUILD_TABLET_GROUP_INFO,)\
ACT(BEFORE_RESTORE_SERVICE_PUSH_FETCH_DATA,)\
ACT(AFTER_MIGRATION_REPORT_LS_META_TABLE,)\
ACT(BEFORE_RESTORE_HANDLE_FETCH_LOG_TASK,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);