From 7c63a546e799f8c028cfb74a286bcc0ce4d56f58 Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Tue, 11 Jul 2023 05:24:25 +0000 Subject: [PATCH] [CP] push RoleChangeEvent to the tail of queue when replay timeout --- .../test_role_change_service.cpp | 41 +++++++++++ .../rcservice/ob_role_change_service.cpp | 62 +++++++++++------ .../rcservice/ob_role_change_service.h | 68 ++++++++++++++----- 3 files changed, 136 insertions(+), 35 deletions(-) diff --git a/mittest/simple_server/test_role_change_service.cpp b/mittest/simple_server/test_role_change_service.cpp index bbc7ba1267..d0e09be75b 100644 --- a/mittest/simple_server/test_role_change_service.cpp +++ b/mittest/simple_server/test_role_change_service.cpp @@ -93,8 +93,48 @@ TEST_F(RoleChangeService, unique_set) } } +TEST_F(RoleChangeService, basic_func) +{ + CLOG_LOG(INFO, "start basic_func"); + const char *tenant_name = "runlin"; + EXPECT_EQ(OB_SUCCESS, create_tenant(tenant_name)); + uint64_t tenant_id = 0; + EXPECT_EQ(OB_SUCCESS, get_tenant_id(tenant_id, tenant_name)); + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + // 上下文切换为runlin租户 + ASSERT_EQ(OB_SUCCESS, guard.switch_to(tenant_id)); + ObLogService *log_service = MTL(ObLogService*); + ASSERT_NE(nullptr, log_service); + ObRoleChangeService *role_change_service = &log_service->role_change_service_; + ASSERT_NE(nullptr, role_change_service); + ObLSService *ls_service = MTL(ObLSService*); + ASSERT_NE(nullptr, ls_service); + { + ObLSHandle ls; + EXPECT_EQ(OB_SUCCESS, ls_service->get_ls(ObLSID(1001), ls, ObLSGetMod::LOG_MOD)); + // 停止回放 + EXPECT_EQ(OB_SUCCESS, ls.get_ls()->disable_replay()); + ObLogHandler *log_handler = &ls.get_ls()->log_handler_; + EXPECT_EQ(LEADER, log_handler->role_); + log_handler->role_ = FOLLOWER; + RoleChangeEvent event_stack; + event_stack.event_type_ = RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE; + event_stack.ls_id_ = ObLSID(1001); + ObRoleChangeService::RetrySubmitRoleChangeEventCtx retry_ctx; + EXPECT_EQ(OB_TIMEOUT, role_change_service->handle_role_change_cb_event_for_log_handler_(palf::AccessMode::APPEND, ls.get_ls(), retry_ctx)); + EXPECT_EQ(retry_ctx.reason_, ObRoleChangeService::RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT); + EXPECT_EQ(retry_ctx.need_retry(), true); + EXPECT_EQ(OB_SUCCESS, role_change_service->on_role_change(1001)); + sleep(10); + } + EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin")); + CLOG_LOG(INFO, "end basic_func"); +} + + TEST_F(RoleChangeService, test_offline) { + CLOG_LOG(INFO, "start test_offline"); EXPECT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy_with_short_wait()); share::ObTenantSwitchGuard tguard; ASSERT_EQ(OB_SUCCESS, tguard.switch_to(1)); @@ -153,6 +193,7 @@ TEST_F(RoleChangeService, test_offline) EXPECT_EQ(common::FOLLOWER, log_handler->role_); EXPECT_EQ(true, log_handler->is_offline_); EXPECT_EQ(-1, log_handler->apply_status_->proposal_id_); + CLOG_LOG(INFO, "end test_offline"); } } // end unittest diff --git a/src/logservice/rcservice/ob_role_change_service.cpp b/src/logservice/rcservice/ob_role_change_service.cpp index f483fc95d1..6faeb7463c 100644 --- a/src/logservice/rcservice/ob_role_change_service.cpp +++ b/src/logservice/rcservice/ob_role_change_service.cpp @@ -210,18 +210,23 @@ void ObRoleChangeService::handle(void *task) // TIMEGUARD will pring lbt(). TIMEGUARD_INIT(CLOG, 30_s, 30_s); RoleChangeEvent *event = reinterpret_cast(task); + const int64_t ls_id = event->ls_id_.id(); const int64_t start_ts = ObTimeUtility::current_time(); + RetrySubmitRoleChangeEventCtx retry_ctx; CLOG_LOG(INFO, "begin handle_role_change_event_", "sequence:", start_ts, KPC(event)); if (NULL == event) { CLOG_LOG(WARN, "unexpected error, task is nullptr", KP(event)); - } else if (OB_FAIL(handle_role_change_event_(*event))) { - CLOG_LOG(WARN, "handle_role_change_event_ failed", K(ret), KPC(event)); + } else if (OB_FAIL(handle_role_change_event_(*event, retry_ctx))) { + CLOG_LOG(WARN, "handle_role_change_event_ failed", K(ret), KPC(event), K(retry_ctx)); } else { CLOG_LOG(INFO, "end handle_role_change_event_", "sequence:", start_ts, KPC(event)); } if (NULL != event) { OB_DELETE(RoleChangeEvent, "RCService", event); } + if (retry_ctx.need_retry() && OB_FAIL(on_role_change(ls_id))) { + CLOG_LOG(WARN, "retry submit role change event failed", K(ls_id), K(retry_ctx)); + } } int ObRoleChangeService::on_role_change(const int64_t id) @@ -296,7 +301,8 @@ int ObRoleChangeService::push_event_into_queue_(const RoleChangeEvent &event) return ret; } -int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event) +int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event, + RetrySubmitRoleChangeEventCtx &retry_ctx) { int ret = OB_SUCCESS; ObLSHandle ls_handle; @@ -328,13 +334,13 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event) break; case RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE: CLOG_LOG(INFO, "begin log handler role change", K(curr_access_mode), K(event), KPC(ls)); - if (OB_FAIL(handle_role_change_cb_event_for_log_handler_(curr_access_mode, ls))) { + if (OB_FAIL(handle_role_change_cb_event_for_log_handler_(curr_access_mode, ls, retry_ctx))) { CLOG_LOG(WARN, "handle_role_change_cb_event_for_log_handler_ failed", K(ret), K(curr_access_mode), KPC(ls)); } - CLOG_LOG(INFO, "end log handler role change", K(ret), K(curr_access_mode), K(event), KPC(ls)); + CLOG_LOG(INFO, "end log handler role change", K(ret), K(curr_access_mode), K(event), KPC(ls), K(retry_ctx)); CLOG_LOG(INFO, "begin restore handler role change", K(curr_access_mode), K(event), KPC(ls)); - if (OB_FAIL(handle_role_change_cb_event_for_restore_handler_(curr_access_mode, ls))) { + if (!retry_ctx.need_retry() && OB_FAIL(handle_role_change_cb_event_for_restore_handler_(curr_access_mode, ls))) { CLOG_LOG(WARN, "handle_role_change_cb_event_for_restore_handler_ failed", K(ret), K(curr_access_mode), KPC(ls)); } @@ -345,7 +351,7 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event) CLOG_LOG(WARN, "unexpected role change event type", K(ret)); } } - if (OB_SUCC(ret) && OB_NOT_NULL(ls)) { + if (OB_SUCC(ret) && OB_NOT_NULL(ls) && !retry_ctx.need_retry()) { (void)ls->report_replica_info(); } return ret; @@ -421,7 +427,8 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_( int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_( const AccessMode &curr_access_mode, - ObLS *ls) + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx) { int ret = OB_SUCCESS; const bool log_handler_is_offline = ls->get_log_handler()->is_offline(); @@ -459,14 +466,14 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_( break; // follower -> follower case RoleChangeOptType::FOLLOWER_2_LEADER: - if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls))) { + if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) { CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(curr_role), K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); } break; // leader -> leader case RoleChangeOptType::LEADER_2_LEADER: - if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls))) { + if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls, retry_ctx))) { CLOG_LOG(WARN, "switch_leader_to_leader_ failed", K(ret), K(curr_role), K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id)); } @@ -549,7 +556,8 @@ int ObRoleChangeService::handle_change_leader_event_for_log_handler_( int ObRoleChangeService::switch_follower_to_leader_( const int64_t new_proposal_id, - ObLS *ls) + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx) { int ret = OB_SUCCESS; const ObRole new_role = LEADER; @@ -566,8 +574,12 @@ int ObRoleChangeService::switch_follower_to_leader_( // We must guarantee that 'replay_service_' has replayed complete data, and before // stop 'replay_service_', other components can not submit log. } else if (FALSE_IT(time_guard.click("wait_replay_service_apply_done_")) - || OB_FAIL(wait_replay_service_replay_done_(ls_id, end_lsn))) { - CLOG_LOG(WARN, "wait_replay_service_replay_done_ failed", K(ret), K(end_lsn)); + || OB_FAIL(wait_replay_service_replay_done_(ls_id, end_lsn, WAIT_REPLAY_DONE_TIMEOUT_US))) { + if (need_retry_submit_role_change_event_(ret)) { + retry_ctx.set_retry_reason(RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT); + } else { + CLOG_LOG(WARN, "wait_replay_service_replay_done_ failed", K(ret), K(end_lsn)); + } } else if (FALSE_IT(time_guard.click("apply_service->switch_to_leader")) || OB_FAIL(apply_service_->switch_to_leader(ls_id, new_proposal_id))) { CLOG_LOG(WARN, "apply_service_ switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id)); @@ -584,7 +596,7 @@ int ObRoleChangeService::switch_follower_to_leader_( ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE); CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls)); } - if (OB_FAIL(ret)) { + if (OB_FAIL(ret) && !retry_ctx.need_retry()) { log_handler->revoke_leader(); CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), KPC(ls)); } @@ -696,7 +708,8 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_( int ObRoleChangeService::switch_leader_to_leader_( const int64_t new_proposal_id, const int64_t curr_proposal_id, - ObLS *ls) + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx) { int ret = OB_SUCCESS; ObTimeGuard time_guard("switch_leader_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME); @@ -704,7 +717,7 @@ int ObRoleChangeService::switch_leader_to_leader_( || OB_FAIL(switch_leader_to_follower_forcedly_(curr_proposal_id, ls))) { CLOG_LOG(WARN, "switch_leader_to_leader_, switch leader to follower failed", K(ret), KPC(ls)); } else if (FALSE_IT(time_guard.click("switch_follower_to_leader_")) - || OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls))) { + || OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) { CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(new_proposal_id)); } else { CLOG_LOG(INFO, "switch_leader_to_leader_ success", K(ret), KPC(ls)); @@ -813,16 +826,18 @@ int ObRoleChangeService::switch_leader_to_leader_restore_( int ObRoleChangeService::wait_replay_service_replay_done_( const share::ObLSID &ls_id, - const palf::LSN &end_lsn) + const palf::LSN &end_lsn, + const int64_t timeout_us) { int ret = OB_SUCCESS; bool is_done = false; const int64_t start_ts = ObTimeUtility::current_time(); - while (OB_SUCC(ret) && false == is_done) { + palf::TimeoutChecker not_timeout(timeout_us); + while (OB_SUCC(ret) && false == is_done && OB_SUCC(not_timeout())) { if (OB_FAIL(replay_service_->is_replay_done(ls_id, end_lsn, is_done))) { CLOG_LOG(WARN, "replay_service_ is_replay_done failed", K(ret), K(is_done), K(end_lsn)); } else if (false == is_done) { - ob_usleep(5*1000); + ob_usleep(50*1000); CLOG_LOG(INFO, "wait replay done return false, need retry", K(ls_id), K(end_lsn), K(start_ts)); } else { } @@ -959,5 +974,14 @@ bool ObRoleChangeService::is_raw_write_or_flashback_mode(const AccessMode &mode) AccessMode::PREPARE_FLASHBACK == mode); } +bool ObRoleChangeService::need_retry_submit_role_change_event_(int ret) const +{ + bool bool_ret = false; + if (OB_TIMEOUT == ret) { + bool_ret = true; + } + return bool_ret; +} + } // end namespace logservice } // end namespace oceanbase diff --git a/src/logservice/rcservice/ob_role_change_service.h b/src/logservice/rcservice/ob_role_change_service.h index 4be9600ccc..93d24a5bca 100644 --- a/src/logservice/rcservice/ob_role_change_service.h +++ b/src/logservice/rcservice/ob_role_change_service.h @@ -82,10 +82,44 @@ public: int on_need_change_leader(const int64_t ls_id, const common::ObAddr &dst_addr) final override; int diagnose(RCDiagnoseInfo &diagnose_info); +private: + enum class RoleChangeOptType { + INVALID_RC_OPT_TYPE = 0, + FOLLOWER_2_LEADER = 1, + LEADER_2_FOLLOWER = 2, + FOLLOWER_2_FOLLOWER = 3, + LEADER_2_LEADER = 4, + MAX_RC_OPT_TYPE = 5 + }; + enum class RetrySubmitRoleChangeEventReason { + INVALID_TYPE = 0, + WAIT_REPLAY_DONE_TIMEOUT = 1, + MAX_TYPE = 2 + }; + class RetrySubmitRoleChangeEventCtx { + public: + RetrySubmitRoleChangeEventCtx() : reason_(RetrySubmitRoleChangeEventReason::INVALID_TYPE) {} + ~RetrySubmitRoleChangeEventCtx() + { + reason_ = RetrySubmitRoleChangeEventReason::INVALID_TYPE; + } + bool need_retry() const + { + return RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT == reason_; + } + void set_retry_reason(const RetrySubmitRoleChangeEventReason &reason) + { + reason_ = reason; + } + TO_STRING_KV(K_(reason)); + private: + RetrySubmitRoleChangeEventReason reason_; + }; private: int submit_role_change_event_(const RoleChangeEvent &event); int push_event_into_queue_(const RoleChangeEvent &event); - int handle_role_change_event_(const RoleChangeEvent &event); + int handle_role_change_event_(const RoleChangeEvent &event, + RetrySubmitRoleChangeEventCtx &retry_ctx); int handle_role_change_cb_event_for_restore_handler_(const palf::AccessMode &curr_access_mode, ObLS *ls); @@ -93,25 +127,28 @@ private: ObLS *ls); int handle_role_change_cb_event_for_log_handler_(const palf::AccessMode &curr_access_mode, - ObLS *ls); + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx); int handle_change_leader_event_for_log_handler_(const common::ObAddr &dst_addr, ObLS *ls); + // retval + // - OB_SUCCESS + // - OB_TIMEOUT, means wait replay finish timeout. int switch_follower_to_leader_(const int64_t new_proposal_id, - ObLS *ls); + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx); int switch_leader_to_follower_forcedly_(const int64_t new_proposal_id, ObLS *ls); int switch_leader_to_follower_gracefully_(const int64_t new_proposal_id, const int64_t curr_proposal_id, const common::ObAddr &dst_addr, ObLS *ls); - int switch_leader_to_leader_(const common::ObRole &new_role, - const int64_t new_proposal_id, - ObLS *ls); int switch_follower_to_follower_(const int64_t new_proposal_id, ObLS *ls); int switch_leader_to_leader_(const int64_t new_proposal_id, const int64_t curr_proposal_id, - ObLS *ls); + ObLS *ls, + RetrySubmitRoleChangeEventCtx &retry_ctx); int switch_follower_to_leader_restore_(const int64_t new_proposal_id, ObLS *ls); @@ -124,8 +161,10 @@ private: int switch_leader_to_leader_restore_(const int64_t new_proposal_id, const int64_t curr_proposal_id, ObLS *ls); + // wait replay finish with timeout. int wait_replay_service_replay_done_(const share::ObLSID &ls_id, - const palf::LSN &end_lsn); + const palf::LSN &end_lsn, + const int64_t timeout_us); int wait_apply_service_apply_done_(const share::ObLSID &ls_id, palf::LSN &end_lsn); int wait_apply_service_apply_done_when_change_leader_(const ObLogHandler *log_handler, @@ -142,17 +181,13 @@ private: bool is_append_mode(const palf::AccessMode &access_mode) const; bool is_raw_write_or_flashback_mode(const palf::AccessMode &access_mode) const; private: - enum class RoleChangeOptType { - INVALID_RC_OPT_TYPE = 0, - FOLLOWER_2_LEADER = 1, - LEADER_2_FOLLOWER = 2, - FOLLOWER_2_FOLLOWER = 3, - LEADER_2_LEADER = 4, - MAX_RC_OPT_TYPE = 5 - }; RoleChangeOptType get_role_change_opt_type_(const common::ObRole &old_role, const common::ObRole &new_role, const bool need_transform_by_access_mode) const; + // retry submit role change event + // NB: nowdays, we only support retry submit role change event when wait replay finished + // timeout. + bool need_retry_submit_role_change_event_(int ret) const; public: static const int64_t MAX_THREAD_NUM = 1; static const int64_t MAX_RC_EVENT_TASK = 1024 * 1024; @@ -160,6 +195,7 @@ private: DISALLOW_COPY_AND_ASSIGN(ObRoleChangeService); private: static constexpr int64_t EACH_ROLE_CHANGE_COST_MAX_TIME = 1 * 1000 * 1000; + static constexpr int64_t WAIT_REPLAY_DONE_TIMEOUT_US = 2 * 1000 * 1000; storage::ObLSService *ls_service_; logservice::ObLogApplyService *apply_service_; logservice::ObILogReplayService *replay_service_;