diff --git a/src/logservice/restoreservice/ob_fetch_log_task.cpp b/src/logservice/restoreservice/ob_fetch_log_task.cpp index 3a9188365..1019c82d4 100644 --- a/src/logservice/restoreservice/ob_fetch_log_task.cpp +++ b/src/logservice/restoreservice/ob_fetch_log_task.cpp @@ -76,9 +76,11 @@ ObFetchLogTask::ObFetchLogTask(const share::ObLSID &id, const SCN &pre_scn, const palf::LSN &lsn, const int64_t size, - const int64_t proposal_id) : + const int64_t proposal_id, + const int64_t version) : id_(id), proposal_id_(proposal_id), + version_(version), start_lsn_(lsn), cur_lsn_(lsn), end_lsn_(lsn + size), @@ -93,6 +95,7 @@ bool ObFetchLogTask::is_valid() const { return id_.is_valid() && proposal_id_ > 0 + && version_ > 0 && pre_scn_.is_valid() && start_lsn_.is_valid() && cur_lsn_.is_valid() @@ -104,6 +107,7 @@ void ObFetchLogTask::reset() { id_.reset(); proposal_id_ = 0; + version_ = 0; pre_scn_.reset(); start_lsn_.reset(); cur_lsn_.reset(); diff --git a/src/logservice/restoreservice/ob_fetch_log_task.h b/src/logservice/restoreservice/ob_fetch_log_task.h index de35ade39..33125f367 100644 --- a/src/logservice/restoreservice/ob_fetch_log_task.h +++ b/src/logservice/restoreservice/ob_fetch_log_task.h @@ -35,18 +35,20 @@ public: const share::SCN &pre_scn, const palf::LSN &lsn, const int64_t size, - const int64_t proposal_id); + const int64_t proposal_id, + const int64_t version); ~ObFetchLogTask() { reset(); } bool is_valid() const; void reset(); - TO_STRING_KV(K_(id), K_(proposal_id), K_(pre_scn), K_(start_lsn), K_(cur_lsn), - K_(end_lsn), K_(max_fetch_scn), K_(max_submit_scn), K_(iter)); + TO_STRING_KV(K_(id), K_(proposal_id), K_(version), K_(pre_scn), K_(start_lsn), + K_(cur_lsn), K_(end_lsn), K_(max_fetch_scn), K_(max_submit_scn), K_(iter)); public: share::ObLSID id_; // to distinguish stale tasks which is generated in previous leader int64_t proposal_id_; + int64_t version_; share::SCN pre_scn_; // heuristic log scn to locate piece, may be imprecise one palf::LSN start_lsn_; palf::LSN cur_lsn_; diff --git a/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp b/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp index b4642e827..689d52715 100644 --- a/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp +++ b/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp @@ -56,8 +56,8 @@ void ObLogArchivePieceContext::RoundContext::reset() bool ObLogArchivePieceContext::RoundContext::is_valid() const { - return ((RoundContext::State::ACTIVE == state_ && end_scn_ == SCN::max_scn()) - || (RoundContext::State::STOP == state_ && end_scn_ > start_scn_)) + return ((RoundContext::State::ACTIVE == state_ && min_piece_id_ > 0 && start_scn_ != SCN::max_scn()) + || (RoundContext::State::STOP == state_ && end_scn_ != SCN::max_scn() && end_scn_ > start_scn_)) && round_id_ > 0 && start_scn_.is_valid() && base_piece_id_ > 0 diff --git a/src/logservice/restoreservice/ob_log_restore_handler.cpp b/src/logservice/restoreservice/ob_log_restore_handler.cpp index fc6c4be78..dfb383124 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.cpp +++ b/src/logservice/restoreservice/ob_log_restore_handler.cpp @@ -186,10 +186,13 @@ int ObLogRestoreHandler::add_source(logservice::DirArray &array, const SCN &end_ ret = OB_ALLOCATE_MEMORY_FAILED; } else { ObRemoteRawPathParent *source = static_cast(parent_); + const bool source_exist = source->is_valid(); if (OB_FAIL(source->set(array, end_scn))) { CLOG_LOG(WARN, "ObRemoteRawPathParent set failed", K(ret), K(array), K(end_scn)); ObResSrcAlloctor::free(parent_); parent_ = NULL; + } else if (! source_exist) { + context_.set_issue_version(); } } return ret;; @@ -212,10 +215,13 @@ int ObLogRestoreHandler::add_source(share::ObBackupDest &dest, const SCN &end_sc ret = OB_ALLOCATE_MEMORY_FAILED; } else { ObRemoteLocationParent *source = static_cast(parent_); + const bool source_exist = source->is_valid(); if (OB_FAIL(source->set(dest, end_scn))) { CLOG_LOG(WARN, "ObRemoteLocationParent set failed", K(ret), K(end_scn), K(dest)); ObResSrcAlloctor::free(parent_); parent_ = NULL; + } else if (! source_exist) { + context_.set_issue_version(); } } return ret; @@ -238,11 +244,14 @@ int ObLogRestoreHandler::add_source(const common::ObAddr &addr, const SCN &end_s ret = OB_ALLOCATE_MEMORY_FAILED; } else { ObRemoteSerivceParent *source = static_cast(parent_); + const bool source_exist = source->is_valid(); if (OB_FAIL(source->set(addr, end_scn))) { CLOG_LOG(WARN, "ObRemoteSerivceParent set failed", K(ret), K(end_scn), K(addr), KPC(this)); ObResSrcAlloctor::free(parent_); parent_ = NULL; + } else if (! source_exist) { + context_.set_issue_version(); } } return ret; @@ -261,6 +270,7 @@ int ObLogRestoreHandler::clean_source() CLOG_LOG(INFO, "log_restore_source is empty, clean source", KPC(parent_)); ObResSrcAlloctor::free(parent_); parent_ = NULL; + context_.reset(); } return ret; } @@ -340,6 +350,7 @@ void ObLogRestoreHandler::deep_copy_source(ObRemoteSourceGuard &source_guard) int ObLogRestoreHandler::schedule(const int64_t id, const int64_t proposal_id, + const int64_t version, const LSN &lsn, bool &scheduled) { @@ -350,7 +361,7 @@ int ObLogRestoreHandler::schedule(const int64_t id, ret = OB_NOT_INIT; } else if (OB_UNLIKELY(! lsn.is_valid())) { ret = OB_INVALID_ARGUMENT; - } else if (id != id_ || proposal_id != proposal_id_) { + } else if (id != id_ || proposal_id != proposal_id_ || version != context_.issue_version_) { // stale task } else { scheduled = true; @@ -365,23 +376,24 @@ int ObLogRestoreHandler::try_retire_task(ObFetchLogTask &task, bool &done) done = false; int ret = OB_SUCCESS; WLockGuard guard(lock_); - if (OB_UNLIKELY(!task.is_valid())) { + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(!task.is_valid() || task.id_.id() != id_)) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid argument", K(ret), K(task)); } else if (! is_strong_leader(role_) || NULL == parent_) { done = true; - CLOG_LOG(INFO, "ls not leader, stale task, just skip it", K(task), K(role_)); - } else if (OB_UNLIKELY(task.id_.id() != id_ || task.proposal_id_ != proposal_id_)) { + CLOG_LOG(INFO, "ls not leader or source is NULL, stale task, just skip it", K(task), K(role_)); + } else if (OB_UNLIKELY(task.proposal_id_ != proposal_id_ + || task.version_ != context_.issue_version_)) { done = true; CLOG_LOG(INFO, "stale task, just skip it", K(task), KPC(this)); } else if (context_.max_fetch_lsn_ >= task.end_lsn_ || parent_->to_end()) { done = true; + context_.issue_task_num_--; } else if (context_.max_fetch_lsn_ >= task.start_lsn_) { task.cur_lsn_ = context_.max_fetch_lsn_; } - if (done) { - context_.issue_task_num_--; - } return ret; } diff --git a/src/logservice/restoreservice/ob_log_restore_handler.h b/src/logservice/restoreservice/ob_log_restore_handler.h index 135d0d288..23f4d57b5 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.h +++ b/src/logservice/restoreservice/ob_log_restore_handler.h @@ -116,7 +116,11 @@ public: // @brief the big range of log is separated into small tasks, so as to do parallel, // concise dispatch is needed, here is to check if new task is in turn int need_schedule(bool &need_schedule, int64_t &proposal_id, ObRemoteFetchContext &context) const; - int schedule(const int64_t id, const int64_t proposal_id, const LSN &lsn, bool &scheduled); + int schedule(const int64_t id, + const int64_t proposal_id, + const int64_t version, + const LSN &lsn, + bool &scheduled); // @brief try retire fetch log task // @param[in] ObFetchLogTask &, the remote fetch log task // @param[out] bool &, the remote fetch log task is to end or not, retire it if true diff --git a/src/logservice/restoreservice/ob_remote_fetch_context.cpp b/src/logservice/restoreservice/ob_remote_fetch_context.cpp index 8cfe01f6a..d6da72fe7 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_context.cpp +++ b/src/logservice/restoreservice/ob_remote_fetch_context.cpp @@ -11,6 +11,10 @@ */ #include "ob_remote_fetch_context.h" +#include "lib/ob_define.h" +#include "lib/ob_errno.h" +#include "lib/time/ob_time_utility.h" +#include "lib/utility/ob_macro_utils.h" namespace oceanbase { @@ -20,6 +24,7 @@ namespace logservice ObRemoteFetchContext &ObRemoteFetchContext::operator=(const ObRemoteFetchContext &other) { issue_task_num_ = other.issue_task_num_; + issue_version_ = other.issue_version_; last_fetch_ts_ = other.last_fetch_ts_; max_submit_lsn_ = other.max_submit_lsn_; max_fetch_lsn_ = other.max_fetch_lsn_; @@ -31,6 +36,7 @@ ObRemoteFetchContext &ObRemoteFetchContext::operator=(const ObRemoteFetchContext void ObRemoteFetchContext::reset() { issue_task_num_ = 0; + issue_version_ = OB_INVALID_TIMESTAMP; last_fetch_ts_ = OB_INVALID_TIMESTAMP; max_submit_lsn_.reset(); max_fetch_lsn_.reset(); @@ -55,5 +61,15 @@ int ObRemoteFetchContext::reset_sorted_tasks() return ret; } +void ObRemoteFetchContext::set_issue_version() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_TIMESTAMP != issue_version_)) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "issue_version is valid", KPC(this)); + } else { + issue_version_ = common::ObTimeUtility::current_time_ns(); + } +} } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_remote_fetch_context.h b/src/logservice/restoreservice/ob_remote_fetch_context.h index fc2cc7e2a..c66cb4770 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_context.h +++ b/src/logservice/restoreservice/ob_remote_fetch_context.h @@ -28,6 +28,7 @@ namespace logservice struct ObRemoteFetchContext { int64_t issue_task_num_; + int64_t issue_version_; int64_t last_fetch_ts_; // 最后一次拉日志时间 palf::LSN max_submit_lsn_; // 提交远程日志拉取任务最大LSN palf::LSN max_fetch_lsn_; // 拉到最后一条日志end_lsn @@ -40,8 +41,10 @@ struct ObRemoteFetchContext ObRemoteFetchContext &operator=(const ObRemoteFetchContext &other); void reset(); int reset_sorted_tasks(); - TO_STRING_KV(K_(issue_task_num), K_(last_fetch_ts), K_(max_submit_lsn), K_(max_fetch_lsn), - K_(max_fetch_scn), K_(error_context), "task_count", submit_array_.count()); + void set_issue_version(); + TO_STRING_KV(K_(issue_task_num), K_(issue_version), K_(last_fetch_ts), + K_(max_submit_lsn), K_(max_fetch_lsn), K_(max_fetch_scn), + K_(error_context), "task_count", submit_array_.count()); }; } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_remote_fetch_log.cpp b/src/logservice/restoreservice/ob_remote_fetch_log.cpp index 4d326f2e4..565c54524 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log.cpp +++ b/src/logservice/restoreservice/ob_remote_fetch_log.cpp @@ -125,6 +125,7 @@ int ObRemoteFetchLogImpl::do_fetch_log_(ObLS &ls) bool can_fetch_log = false; bool need_schedule = false; int64_t proposal_id = -1; + int64_t version = 0; LSN lsn; SCN pre_scn; // scn to locate piece LSN max_fetch_lsn; @@ -138,13 +139,13 @@ int ObRemoteFetchLogImpl::do_fetch_log_(ObLS &ls) LOG_WARN("check replica status failed", K(ret), K(ls)); } else if (! can_fetch_log) { // just skip - } else if (OB_FAIL(check_need_schedule_(ls, need_schedule, - proposal_id, max_fetch_lsn, last_fetch_ts, task_count))) { + } else if (OB_FAIL(check_need_schedule_(ls, need_schedule, proposal_id, + version, max_fetch_lsn, last_fetch_ts, task_count))) { LOG_WARN("check need schedule failed", K(ret), K(id)); } else if (! need_schedule) { } else if (OB_FAIL(get_fetch_log_base_lsn_(ls, max_fetch_lsn, last_fetch_ts, pre_scn, lsn))) { LOG_WARN("get fetch log base lsn failed", K(ret), K(id)); - } else if (OB_FAIL(submit_fetch_log_task_(ls, pre_scn, lsn, task_count, proposal_id))) { + } else if (OB_FAIL(submit_fetch_log_task_(ls, pre_scn, lsn, task_count, proposal_id, version))) { LOG_WARN("submit fetch log task failed", K(ret), K(id), K(lsn), K(task_count)); } else { LOG_TRACE("do fetch log succ", K(id), K(lsn), K(task_count)); @@ -169,6 +170,7 @@ int ObRemoteFetchLogImpl::check_replica_status_(ObLS &ls, bool &can_fetch_log) int ObRemoteFetchLogImpl::check_need_schedule_(ObLS &ls, bool &need_schedule, int64_t &proposal_id, + int64_t &version, LSN &lsn, int64_t &last_fetch_ts, int64_t &task_count) @@ -195,6 +197,7 @@ int ObRemoteFetchLogImpl::check_need_schedule_(ObLS &ls, } else if (need_delay) { need_schedule = false; } else { + version = context.issue_version_; lsn = context.max_submit_lsn_; last_fetch_ts = context.last_fetch_ts_; task_count = concurrency - context.issue_task_num_; @@ -264,7 +267,8 @@ int ObRemoteFetchLogImpl::submit_fetch_log_task_(ObLS &ls, const SCN &scn, const LSN &lsn, const int64_t task_count, - const int64_t proposal_id) + const int64_t proposal_id, + const int64_t version) { int ret = OB_SUCCESS; LSN start_lsn = lsn; @@ -275,7 +279,7 @@ int ObRemoteFetchLogImpl::submit_fetch_log_task_(ObLS &ls, const LSN end_lsn = LSN((start_lsn.val_ / max_size + 1) * max_size); const int64_t size = static_cast(end_lsn - start_lsn); scheduled = false; - if (OB_FAIL(do_submit_fetch_log_task_(ls, scn, start_lsn, size, proposal_id, scheduled))) { + if (OB_FAIL(do_submit_fetch_log_task_(ls, scn, start_lsn, size, proposal_id, version, scheduled))) { LOG_WARN("do submit fetch log task failed", K(ret), K(ls)); } else if (! scheduled) { break; @@ -291,6 +295,7 @@ int ObRemoteFetchLogImpl::do_submit_fetch_log_task_(ObLS &ls, const LSN &lsn, const int64_t size, const int64_t proposal_id, + const int64_t version, bool &scheduled) { int ret = OB_SUCCESS; @@ -302,12 +307,12 @@ int ObRemoteFetchLogImpl::do_submit_fetch_log_task_(ObLS &ls, ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret), K(id)); } else { - task = new (data) ObFetchLogTask(id, scn, lsn, size, proposal_id); + task = new (data) ObFetchLogTask(id, scn, lsn, size, proposal_id, version); ObLogRestoreHandler *restore_handler = NULL; if (OB_ISNULL(restore_handler = ls.get_log_restore_handler())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("get restore_handler failed", K(ret), K(ls)); - } else if (OB_FAIL(restore_handler->schedule(id.id(), proposal_id, lsn + size, scheduled))) { + } else if (OB_FAIL(restore_handler->schedule(id.id(), proposal_id, version, lsn + size, scheduled))) { LOG_WARN("schedule failed", K(ret), K(ls)); } else if (! scheduled) { // not scheduled diff --git a/src/logservice/restoreservice/ob_remote_fetch_log.h b/src/logservice/restoreservice/ob_remote_fetch_log.h index c23cf2df2..0d81c8aa6 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log.h +++ b/src/logservice/restoreservice/ob_remote_fetch_log.h @@ -61,12 +61,14 @@ private: int do_fetch_log_(ObLS &ls); int check_replica_status_(ObLS &ls, bool &can_fetch_log); int check_need_schedule_(ObLS &ls, bool &need_schedule, int64_t &proposal_id, - LSN &lsn, int64_t &last_fetch_ts, int64_t &task_count); + int64_t &version, LSN &lsn, int64_t &last_fetch_ts, int64_t &task_count); int check_need_delay_(const ObLSID &id, bool &need_delay); int get_fetch_log_base_lsn_(ObLS &ls, const LSN &max_fetch_lsn, const int64_t last_fetch_ts, share::SCN &scn, LSN &lsn); int get_palf_base_lsn_scn_(ObLS &ls, LSN &lsn, share::SCN &scn); - int submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t task_count, const int64_t proposal_id); - int do_submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t size, const int64_t proposal_id, bool &scheduled); + int submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, + const int64_t task_count, const int64_t proposal_id, const int64_t version); + int do_submit_fetch_log_task_(ObLS &ls, const share::SCN &scn, const LSN &lsn, const int64_t size, + const int64_t proposal_id, const int64_t version, bool &scheduled); private: bool inited_; diff --git a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp index 206cd2e9e..378d8e9e2 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp +++ b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp @@ -239,6 +239,7 @@ void ObRemoteFetchWorker::do_thread_task_() int ObRemoteFetchWorker::handle_single_task_() { + DEBUG_SYNC(BEFORE_RESTORE_HANDLE_FETCH_LOG_TASK); int ret = OB_SUCCESS; void *data = NULL; if (OB_FAIL(task_queue_.pop(data))) { diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 8ad0328a4..9f805771e 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -448,6 +448,7 @@ class ObString; ACT(BEFORE_BUILD_TABLET_GROUP_INFO,)\ ACT(BEFORE_RESTORE_SERVICE_PUSH_FETCH_DATA,)\ ACT(AFTER_MIGRATION_REPORT_LS_META_TABLE,)\ + ACT(BEFORE_RESTORE_HANDLE_FETCH_LOG_TASK,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);