diff --git a/deps/oblib/src/lib/thread/ob_thread_lease.h b/deps/oblib/src/lib/thread/ob_thread_lease.h index 3a57ce849..aa9094b66 100644 --- a/deps/oblib/src/lib/thread/ob_thread_lease.h +++ b/deps/oblib/src/lib/thread/ob_thread_lease.h @@ -48,6 +48,10 @@ public: } public: + bool is_idle() const + { + return IDLE == ATOMIC_LOAD(&status_); + } bool acquire() { bool bool_ret = false; diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 510e22b52..38a857ee3 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -574,6 +574,8 @@ class EventTable EN_UNDO_ACTIONS_SIZE_OVERFLOW = 276, EN_PART_PLUS_UNDO_OVERFLOW = 277, EN_HANDLE_PREPARE_MESSAGE_EAGAIN = 278, + EN_RC_ONLY_LEADER_TO_LEADER = 279, + EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP = 280, //simulate DAS errors 301-350 EN_DAS_SCAN_RESULT_OVERFLOW = 301, diff --git a/src/logservice/rcservice/ob_role_change_service.cpp b/src/logservice/rcservice/ob_role_change_service.cpp index 6faeb7463..123e53c93 100644 --- a/src/logservice/rcservice/ob_role_change_service.cpp +++ b/src/logservice/rcservice/ob_role_change_service.cpp @@ -25,6 +25,7 @@ #include "storage/tx_storage/ob_ls_service.h" #include "storage/tx_storage/ob_ls_handle.h" #include "share/ob_occam_time_guard.h" +#include "observer/ob_server_event_history_table_operator.h" namespace oceanbase { @@ -323,7 +324,15 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event, switch (event.event_type_) { case RoleChangeEventType::CHANGE_LEADER_EVENT_TYPE: CLOG_LOG(INFO, "begin change leader", K(curr_access_mode), K(event), KPC(ls)); - if (is_append_mode(curr_access_mode) +#ifdef ERRSIM + ret = OB_E(EventTable::EN_RC_ONLY_LEADER_TO_LEADER) OB_SUCCESS; + if (OB_FAIL(ret)) { + ls->get_log_restore_handler()->change_leader_to(event.dst_addr_); + CLOG_LOG(INFO, "fake EN_RC_ONLY_LEADER_TO_LEADER with change_leader_event", KPC(ls), K(event)); + } +#endif + if (OB_FAIL(ret)) { + } else if (is_append_mode(curr_access_mode) && OB_FAIL(handle_change_leader_event_for_log_handler_(event.dst_addr_, ls))) { CLOG_LOG(WARN, "ObLogHandler change leader failed", K(ret), K(event), KPC(ls)); } else if (is_raw_write_or_flashback_mode(curr_access_mode) @@ -456,39 +465,51 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_( K(new_role), K(new_proposal_id), K(is_pending_state), K(log_handler_is_offline)); } else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, only_need_change_to_follower))) { } else { + +#ifdef ERRSIM + ret = OB_E(EventTable::EN_RC_ONLY_LEADER_TO_LEADER) OB_SUCCESS; + if (RoleChangeOptType::LEADER_2_LEADER == opt_type) { + ret = OB_SUCCESS; + } + if (OB_FAIL(ret)) { + CLOG_LOG(INFO, "fake EN_RC_ONLY_LEADER_TO_LEADER with role_change_event", KPC(ls), K(opt_type)); + } +#endif + switch (opt_type) { // leader -> follower case RoleChangeOptType::LEADER_2_FOLLOWER: - if (OB_FAIL(switch_leader_to_follower_forcedly_(new_proposal_id, ls))) { + if (OB_SUCC(ret) && OB_FAIL(switch_leader_to_follower_forcedly_(new_proposal_id, ls))) { CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(curr_role), - K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); + K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); } break; - // follower -> follower + // follower -> follower case RoleChangeOptType::FOLLOWER_2_LEADER: - if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) { - CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(curr_role), - K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); + if (OB_SUCC(ret) && OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) { + CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(curr_role), K(curr_proposal_id), K(new_role), + K(curr_access_mode), K(new_proposal_id)); } break; - // leader -> leader + // leader -> leader case RoleChangeOptType::LEADER_2_LEADER: - if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls, retry_ctx))) { + if (OB_SUCC(ret) && OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, + ls, retry_ctx))) { CLOG_LOG(WARN, "switch_leader_to_leader_ failed", K(ret), K(curr_role), - K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); + K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); } break; - // follower -> follower - case RoleChangeOptType::FOLLOWER_2_FOLLOWER: - if (OB_FAIL(switch_follower_to_follower_(new_proposal_id, ls))) { - CLOG_LOG(WARN, "switch_follower_to_follower_ failed", K(ret), K(curr_role), - K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); - } - break; - default: - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, "unexpected error, can not handle role change", K(ret), K(curr_role), - K(curr_proposal_id), K(new_role), K(new_proposal_id), KPC(ls)); + // follower -> follower + case RoleChangeOptType::FOLLOWER_2_FOLLOWER: + if (OB_SUCC(ret) && OB_FAIL(switch_follower_to_follower_(new_proposal_id, ls))) { + CLOG_LOG(WARN, "switch_follower_to_follower_ failed", K(curr_role), K(curr_proposal_id), + K(new_role), K(curr_access_mode), K(new_proposal_id)); + } + break; + default: + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "unexpected error, can not handle role change", K(ret), + K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id), KPC(ls)); } } return ret; @@ -585,6 +606,10 @@ int ObRoleChangeService::switch_follower_to_leader_( CLOG_LOG(WARN, "apply_service_ switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id)); } else if (FALSE_IT(time_guard.click("replay_service->switch_to_leader")) || OB_FAIL(replay_service_->switch_to_leader(ls_id))) { + CLOG_LOG(WARN, "replay_service_ switch_to_leader failed", K(new_role), K(new_proposal_id)); + } else if (FALSE_IT(time_guard.click("wait_replay_service_submit_task_clear_")) + || OB_FAIL(wait_replay_service_submit_task_clear_(ls_id))) { + CLOG_LOG(ERROR, "wait_replay_service_submit_task_clear_ failed", K(new_role), K(new_proposal_id)); } else if (FALSE_IT(log_handler->switch_role(new_role, new_proposal_id))) { CLOG_LOG(WARN, "ObLogHandler switch role failed", K(ret), K(new_role), K(new_proposal_id)); } else if (FALSE_IT(ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE))) { @@ -712,6 +737,9 @@ int ObRoleChangeService::switch_leader_to_leader_( RetrySubmitRoleChangeEventCtx &retry_ctx) { int ret = OB_SUCCESS; + #ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("LOGSERVICE", "BEFORE_LEADER_TO_LEADER_RC"); + #endif ObTimeGuard time_guard("switch_leader_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME); if (FALSE_IT(time_guard.click("switch_leader_to_follower_forcedly_")) || OB_FAIL(switch_leader_to_follower_forcedly_(curr_proposal_id, ls))) { @@ -846,6 +874,25 @@ int ObRoleChangeService::wait_replay_service_replay_done_( return ret; } +int ObRoleChangeService::wait_replay_service_submit_task_clear_(const share::ObLSID &ls_id) +{ + int ret = OB_SUCCESS; + bool is_clear = false; + const int64_t start_ts = ObTimeUtility::current_time(); + while (OB_SUCC(ret) && (!is_clear)) { + if (OB_FAIL(replay_service_->is_submit_task_clear(ls_id, is_clear))) { + CLOG_LOG(WARN, "replay_service_ is_submit_task_clean failed", K(is_clear)); + } else if (!is_clear) { + ob_usleep(1 * 1000); + if (REACH_TIME_INTERVAL(100 * 1000L)) { + CLOG_LOG(WARN, "submit_task is not clear, need retry", K(ls_id), K(start_ts)); + } + } else {/*do nothing*/} + } + CLOG_LOG(INFO, "wait_replay_service_submit_task_clear_ finish", K(ls_id), K(is_clear)); + return ret; +} + int ObRoleChangeService::wait_apply_service_apply_done_( const share::ObLSID &ls_id, palf::LSN &end_lsn) diff --git a/src/logservice/rcservice/ob_role_change_service.h b/src/logservice/rcservice/ob_role_change_service.h index 93d24a5bc..7f4c8f068 100644 --- a/src/logservice/rcservice/ob_role_change_service.h +++ b/src/logservice/rcservice/ob_role_change_service.h @@ -165,6 +165,8 @@ private: int wait_replay_service_replay_done_(const share::ObLSID &ls_id, const palf::LSN &end_lsn, const int64_t timeout_us); + + int wait_replay_service_submit_task_clear_(const share::ObLSID &ls_id); int wait_apply_service_apply_done_(const share::ObLSID &ls_id, palf::LSN &end_lsn); int wait_apply_service_apply_done_when_change_leader_(const ObLogHandler *log_handler, diff --git a/src/logservice/replayservice/ob_log_replay_service.cpp b/src/logservice/replayservice/ob_log_replay_service.cpp index 68d58b717..e0bdc7f8a 100644 --- a/src/logservice/replayservice/ob_log_replay_service.cpp +++ b/src/logservice/replayservice/ob_log_replay_service.cpp @@ -21,6 +21,7 @@ #include "share/ob_thread_mgr.h" #include "share/rc/ob_tenant_base.h" #include "storage/tx_storage/ob_tenant_freezer.h" +#include "observer/ob_server_event_history_table_operator.h" namespace oceanbase { @@ -577,6 +578,27 @@ int ObLogReplayService::is_replay_done(const share::ObLSID &id, return ret; } +int ObLogReplayService::is_submit_task_clear(const share::ObLSID &id, bool &is_clear) +{ + int ret = OB_SUCCESS; + ObReplayStatus *replay_status = NULL; + ObReplayStatusGuard guard; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "replay service not init", K(ret)); + } else if (OB_FAIL(get_replay_status_(id, guard))) { + CLOG_LOG(WARN, "guard get replay status failed", K(id)); + } else if (NULL == (replay_status = guard.get_replay_status())) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(WARN, "replay status is not exist", K(id)); + } else if (OB_FAIL(replay_status->is_submit_task_clear(is_clear))){ + CLOG_LOG(WARN, "check replay done failed", K(id)); + } else { + // do nothing + } + return ret; +} + //通用接口, 受控回放时最终返回值为受控回放点前的最后一条日志的log_ts int ObLogReplayService::get_max_replayed_scn(const share::ObLSID &id, SCN &scn) { @@ -850,6 +872,10 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task, CLOG_LOG(ERROR, "invalid argument", KPC(replay_task), KPC(replay_status), KR(ret)); } else if (OB_ISNULL(ls_adapter_)) { ret = OB_NOT_INIT; + } else if (OB_FAIL(replay_status->check_can_replay())) { + if (REACH_TIME_INTERVAL(1000 * 1000)) { + CLOG_LOG(INFO, "can not replay log", KPC(replay_status), KPC(replay_task)); + } } else if (OB_FAIL(replay_status->check_replay_barrier(replay_task, replay_log_buff, need_replay, replay_queue_idx))) { if (REACH_TIME_INTERVAL(1000 * 1000)) { @@ -1117,6 +1143,13 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta if (!replay_status->is_enabled_without_lock() || !replay_status->need_submit_log()) { need_submit_log = false; } else { +#ifdef ERRSIM + int tmp_ret = OB_E(EventTable::EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP) OB_SUCCESS; + if (OB_SUCCESS != tmp_ret) { + usleep(300 * 1000); + CLOG_LOG(INFO, "sleep 300ms before read log", KPC(submit_task), KPC(replay_status), KR(tmp_ret)); + } +#endif const SCN &replayable_point = inner_get_replayable_point_(); need_submit_log = submit_task->has_remained_submit_log(replayable_point, iterate_end_by_replayable_point); diff --git a/src/logservice/replayservice/ob_log_replay_service.h b/src/logservice/replayservice/ob_log_replay_service.h index b3dff482a..44782a5e2 100644 --- a/src/logservice/replayservice/ob_log_replay_service.h +++ b/src/logservice/replayservice/ob_log_replay_service.h @@ -57,6 +57,7 @@ class ObILogReplayService { public: virtual int is_replay_done(const share::ObLSID &id, const palf::LSN &end_lsn, bool &is_done) = 0; + virtual int is_submit_task_clear(const share::ObLSID &id, bool &is_clear) = 0; virtual int switch_to_follower(const share::ObLSID &id, const palf::LSN &begin_lsn) = 0; virtual int switch_to_leader(const share::ObLSID &id) = 0; }; @@ -147,6 +148,7 @@ public: int is_replay_done(const share::ObLSID &id, const palf::LSN &end_lsn, bool &is_done); + int is_submit_task_clear(const share::ObLSID &id, bool &is_clear); int get_max_replayed_scn(const share::ObLSID &id, share::SCN &scn); int submit_task(ObReplayServiceTask *task); int update_replayable_point(const share::SCN &replayable_scn); diff --git a/src/logservice/replayservice/ob_replay_status.cpp b/src/logservice/replayservice/ob_replay_status.cpp index f50439820..b985a023a 100644 --- a/src/logservice/replayservice/ob_replay_status.cpp +++ b/src/logservice/replayservice/ob_replay_status.cpp @@ -282,6 +282,20 @@ int ObReplayServiceSubmitTask::next_log(const SCN &replayable_point, int ObReplayServiceSubmitTask::reset_iterator(PalfHandle &palf_handle, const LSN &begin_lsn) { int ret = OB_SUCCESS; + /* + * next_to_submit_lsn_ may be bigger than begin_lsn; + current replica is A + first revoke ObRoleChangeService::switch_follower_to_leader_(): + T1 A invoke log_handler->get_end_lsn(end_lsn) and end_lsn is 100. + T2 another replica B switches to leader and submits log with lsn[101, 200]. + T3 A invoke replay_service_->switch_to_leader(ls_id) + T4 A failed with invoking role_change_handler->switch_to_leader(cur_task_info_). + replica A replayed log [101, 150) between T1 and T3. + then + T5 replica A invoke ObRoleChangeService::leader_to_follower_forcedly_() + within leader_to_follower_forcedly_(), parameter end_lsn passed to + replay_service_->switch_to_follower(ls_id, end_lsn) is 100 while next_to_submit_lsn_ is 150. + */ next_to_submit_lsn_ = std::max(next_to_submit_lsn_, begin_lsn); if (OB_FAIL(palf_handle.seek(next_to_submit_lsn_, iterator_))) { ret = OB_ERR_UNEXPECTED; @@ -815,6 +829,15 @@ void ObReplayStatus::switch_to_follower(const palf::LSN &begin_lsn) role_ = FOLLOWER; } while (0); +#ifdef ERRSIM +int tmp_ret = OB_E(EventTable::EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP) OB_SUCCESS; +if (OB_SUCCESS != tmp_ret) { + CLOG_LOG(INFO, "fake EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP ", KPC(this), K(begin_lsn)); + SERVER_EVENT_SYNC_ADD("REPLAYSERVICE", "BEFORE_PUSH_SUBMIT_TASK"); +} + DEBUG_SYNC(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK); +#endif + RLockGuard rguard(rwlock_); if (!is_enabled_) { // do nothing @@ -871,6 +894,20 @@ int ObReplayStatus::is_replay_done(const LSN &end_lsn, return ret; } +int ObReplayStatus::is_submit_task_clear(bool &is_clear) const +{ + int ret = OB_SUCCESS; + is_clear = false; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + CLOG_LOG(ERROR, "replay status has not been inited", K(ret)); + } else { + RLockGuard rlock_guard(rwlock_); + is_clear = submit_log_task_.is_idle(); + } + return ret; +} + int ObReplayStatus::update_end_offset(const LSN &lsn) { int ret = OB_SUCCESS; @@ -1376,7 +1413,7 @@ int ObReplayStatus::trigger_fetch_log() { int ret = OB_SUCCESS; RLockGuard rlock_guard(rwlock_); - if (is_enabled_) { + if (is_enabled_ && need_submit_log()) { if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) { CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_), KPC(this), K(ret)); @@ -1386,5 +1423,18 @@ int ObReplayStatus::trigger_fetch_log() } return ret; } + +int ObReplayStatus::check_can_replay() const +{ + int ret = OB_SUCCESS; + { + RLockGuard guard(rolelock_); + if (FOLLOWER != role_) { + ret = OB_EAGAIN; + } + } + return ret; +} + } // namespace logservice } diff --git a/src/logservice/replayservice/ob_replay_status.h b/src/logservice/replayservice/ob_replay_status.h index 65bed115b..4e6650bda 100644 --- a/src/logservice/replayservice/ob_replay_status.h +++ b/src/logservice/replayservice/ob_replay_status.h @@ -205,6 +205,11 @@ public: return lease_.revoke(); } + bool is_idle() const + { + return lease_.is_idle(); + } + ObReplayServiceTaskType get_type() const { return type_; @@ -477,8 +482,13 @@ public: // // @return : OB_SUCCESS : success // OB_NOT_INIT: ObReplayStatus has not been inited - int is_replay_done(const palf::LSN &lsn, - bool &is_done); + int is_replay_done(const palf::LSN &lsn, bool &is_done); + + //check whether submit task is in the global queue of ObLogReplayService + //@param [out] : true if submit task is not in the global queue + //@return : OB_SUCCESS : success + //OB_NOT_INIT: ObReplayStatus has not been inited + int is_submit_task_clear(bool &is_clear) const; // 存在待回放的已提交日志任务 bool has_remained_replay_task() const; // update right margin of logs that need to replay @@ -513,6 +523,8 @@ public: ObLogReplayBuffer *&replay_log_buf, bool &need_replay, const int64_t replay_queue_idx); + //check whether can replay log so far + int check_can_replay() const; void set_post_barrier_submitted(const palf::LSN &lsn); int set_post_barrier_finished(const palf::LSN &lsn); int trigger_fetch_log(); @@ -571,6 +583,7 @@ private: // 注销回调并清空任务 int disable_(); bool is_replay_enabled_() const; + private: static const int64_t PENDING_COUNT_THRESHOLD = 100; static const int64_t EAGAIN_COUNT_THRESHOLD = 50000; diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index f4d905833..bbd3a52e5 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -570,6 +570,7 @@ class ObString; ACT(BEFORE_WAIT_SYS_LS_END_SCN,)\ ACT(BEFORE_CREATE_CLONE_TENANT_END,)\ ACT(BEFORE_CALC_CONSISTENT_SCN,)\ + ACT(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);