[CP] push RoleChangeEvent to the tail of queue when replay timeout
This commit is contained in:
@ -93,8 +93,48 @@ TEST_F(RoleChangeService, unique_set)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(RoleChangeService, basic_func)
|
||||
{
|
||||
CLOG_LOG(INFO, "start basic_func");
|
||||
const char *tenant_name = "runlin";
|
||||
EXPECT_EQ(OB_SUCCESS, create_tenant(tenant_name));
|
||||
uint64_t tenant_id = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, get_tenant_id(tenant_id, tenant_name));
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
// 上下文切换为runlin租户
|
||||
ASSERT_EQ(OB_SUCCESS, guard.switch_to(tenant_id));
|
||||
ObLogService *log_service = MTL(ObLogService*);
|
||||
ASSERT_NE(nullptr, log_service);
|
||||
ObRoleChangeService *role_change_service = &log_service->role_change_service_;
|
||||
ASSERT_NE(nullptr, role_change_service);
|
||||
ObLSService *ls_service = MTL(ObLSService*);
|
||||
ASSERT_NE(nullptr, ls_service);
|
||||
{
|
||||
ObLSHandle ls;
|
||||
EXPECT_EQ(OB_SUCCESS, ls_service->get_ls(ObLSID(1001), ls, ObLSGetMod::LOG_MOD));
|
||||
// 停止回放
|
||||
EXPECT_EQ(OB_SUCCESS, ls.get_ls()->disable_replay());
|
||||
ObLogHandler *log_handler = &ls.get_ls()->log_handler_;
|
||||
EXPECT_EQ(LEADER, log_handler->role_);
|
||||
log_handler->role_ = FOLLOWER;
|
||||
RoleChangeEvent event_stack;
|
||||
event_stack.event_type_ = RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE;
|
||||
event_stack.ls_id_ = ObLSID(1001);
|
||||
ObRoleChangeService::RetrySubmitRoleChangeEventCtx retry_ctx;
|
||||
EXPECT_EQ(OB_TIMEOUT, role_change_service->handle_role_change_cb_event_for_log_handler_(palf::AccessMode::APPEND, ls.get_ls(), retry_ctx));
|
||||
EXPECT_EQ(retry_ctx.reason_, ObRoleChangeService::RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT);
|
||||
EXPECT_EQ(retry_ctx.need_retry(), true);
|
||||
EXPECT_EQ(OB_SUCCESS, role_change_service->on_role_change(1001));
|
||||
sleep(10);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin"));
|
||||
CLOG_LOG(INFO, "end basic_func");
|
||||
}
|
||||
|
||||
|
||||
TEST_F(RoleChangeService, test_offline)
|
||||
{
|
||||
CLOG_LOG(INFO, "start test_offline");
|
||||
EXPECT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy_with_short_wait());
|
||||
share::ObTenantSwitchGuard tguard;
|
||||
ASSERT_EQ(OB_SUCCESS, tguard.switch_to(1));
|
||||
@ -153,6 +193,7 @@ TEST_F(RoleChangeService, test_offline)
|
||||
EXPECT_EQ(common::FOLLOWER, log_handler->role_);
|
||||
EXPECT_EQ(true, log_handler->is_offline_);
|
||||
EXPECT_EQ(-1, log_handler->apply_status_->proposal_id_);
|
||||
CLOG_LOG(INFO, "end test_offline");
|
||||
}
|
||||
|
||||
} // end unittest
|
||||
|
||||
@ -210,18 +210,23 @@ void ObRoleChangeService::handle(void *task)
|
||||
// TIMEGUARD will pring lbt().
|
||||
TIMEGUARD_INIT(CLOG, 30_s, 30_s);
|
||||
RoleChangeEvent *event = reinterpret_cast<RoleChangeEvent*>(task);
|
||||
const int64_t ls_id = event->ls_id_.id();
|
||||
const int64_t start_ts = ObTimeUtility::current_time();
|
||||
RetrySubmitRoleChangeEventCtx retry_ctx;
|
||||
CLOG_LOG(INFO, "begin handle_role_change_event_", "sequence:", start_ts, KPC(event));
|
||||
if (NULL == event) {
|
||||
CLOG_LOG(WARN, "unexpected error, task is nullptr", KP(event));
|
||||
} else if (OB_FAIL(handle_role_change_event_(*event))) {
|
||||
CLOG_LOG(WARN, "handle_role_change_event_ failed", K(ret), KPC(event));
|
||||
} else if (OB_FAIL(handle_role_change_event_(*event, retry_ctx))) {
|
||||
CLOG_LOG(WARN, "handle_role_change_event_ failed", K(ret), KPC(event), K(retry_ctx));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "end handle_role_change_event_", "sequence:", start_ts, KPC(event));
|
||||
}
|
||||
if (NULL != event) {
|
||||
OB_DELETE(RoleChangeEvent, "RCService", event);
|
||||
}
|
||||
if (retry_ctx.need_retry() && OB_FAIL(on_role_change(ls_id))) {
|
||||
CLOG_LOG(WARN, "retry submit role change event failed", K(ls_id), K(retry_ctx));
|
||||
}
|
||||
}
|
||||
|
||||
int ObRoleChangeService::on_role_change(const int64_t id)
|
||||
@ -296,7 +301,8 @@ int ObRoleChangeService::push_event_into_queue_(const RoleChangeEvent &event)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event)
|
||||
int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
@ -328,13 +334,13 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event)
|
||||
break;
|
||||
case RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE:
|
||||
CLOG_LOG(INFO, "begin log handler role change", K(curr_access_mode), K(event), KPC(ls));
|
||||
if (OB_FAIL(handle_role_change_cb_event_for_log_handler_(curr_access_mode, ls))) {
|
||||
if (OB_FAIL(handle_role_change_cb_event_for_log_handler_(curr_access_mode, ls, retry_ctx))) {
|
||||
CLOG_LOG(WARN, "handle_role_change_cb_event_for_log_handler_ failed", K(ret),
|
||||
K(curr_access_mode), KPC(ls));
|
||||
}
|
||||
CLOG_LOG(INFO, "end log handler role change", K(ret), K(curr_access_mode), K(event), KPC(ls));
|
||||
CLOG_LOG(INFO, "end log handler role change", K(ret), K(curr_access_mode), K(event), KPC(ls), K(retry_ctx));
|
||||
CLOG_LOG(INFO, "begin restore handler role change", K(curr_access_mode), K(event), KPC(ls));
|
||||
if (OB_FAIL(handle_role_change_cb_event_for_restore_handler_(curr_access_mode, ls))) {
|
||||
if (!retry_ctx.need_retry() && OB_FAIL(handle_role_change_cb_event_for_restore_handler_(curr_access_mode, ls))) {
|
||||
CLOG_LOG(WARN, "handle_role_change_cb_event_for_restore_handler_ failed", K(ret),
|
||||
K(curr_access_mode), KPC(ls));
|
||||
}
|
||||
@ -345,7 +351,7 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event)
|
||||
CLOG_LOG(WARN, "unexpected role change event type", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ls)) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ls) && !retry_ctx.need_retry()) {
|
||||
(void)ls->report_replica_info();
|
||||
}
|
||||
return ret;
|
||||
@ -421,7 +427,8 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_(
|
||||
|
||||
int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
|
||||
const AccessMode &curr_access_mode,
|
||||
ObLS *ls)
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool log_handler_is_offline = ls->get_log_handler()->is_offline();
|
||||
@ -459,14 +466,14 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
|
||||
break;
|
||||
// follower -> follower
|
||||
case RoleChangeOptType::FOLLOWER_2_LEADER:
|
||||
if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls))) {
|
||||
if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
|
||||
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(curr_role),
|
||||
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
||||
}
|
||||
break;
|
||||
// leader -> leader
|
||||
case RoleChangeOptType::LEADER_2_LEADER:
|
||||
if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls))) {
|
||||
if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls, retry_ctx))) {
|
||||
CLOG_LOG(WARN, "switch_leader_to_leader_ failed", K(ret), K(curr_role),
|
||||
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
||||
}
|
||||
@ -549,7 +556,8 @@ int ObRoleChangeService::handle_change_leader_event_for_log_handler_(
|
||||
|
||||
int ObRoleChangeService::switch_follower_to_leader_(
|
||||
const int64_t new_proposal_id,
|
||||
ObLS *ls)
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObRole new_role = LEADER;
|
||||
@ -566,8 +574,12 @@ int ObRoleChangeService::switch_follower_to_leader_(
|
||||
// We must guarantee that 'replay_service_' has replayed complete data, and before
|
||||
// stop 'replay_service_', other components can not submit log.
|
||||
} else if (FALSE_IT(time_guard.click("wait_replay_service_apply_done_"))
|
||||
|| OB_FAIL(wait_replay_service_replay_done_(ls_id, end_lsn))) {
|
||||
CLOG_LOG(WARN, "wait_replay_service_replay_done_ failed", K(ret), K(end_lsn));
|
||||
|| OB_FAIL(wait_replay_service_replay_done_(ls_id, end_lsn, WAIT_REPLAY_DONE_TIMEOUT_US))) {
|
||||
if (need_retry_submit_role_change_event_(ret)) {
|
||||
retry_ctx.set_retry_reason(RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT);
|
||||
} else {
|
||||
CLOG_LOG(WARN, "wait_replay_service_replay_done_ failed", K(ret), K(end_lsn));
|
||||
}
|
||||
} else if (FALSE_IT(time_guard.click("apply_service->switch_to_leader"))
|
||||
|| OB_FAIL(apply_service_->switch_to_leader(ls_id, new_proposal_id))) {
|
||||
CLOG_LOG(WARN, "apply_service_ switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id));
|
||||
@ -584,7 +596,7 @@ int ObRoleChangeService::switch_follower_to_leader_(
|
||||
ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE);
|
||||
CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(ret) && !retry_ctx.need_retry()) {
|
||||
log_handler->revoke_leader();
|
||||
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), KPC(ls));
|
||||
}
|
||||
@ -696,7 +708,8 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_(
|
||||
int ObRoleChangeService::switch_leader_to_leader_(
|
||||
const int64_t new_proposal_id,
|
||||
const int64_t curr_proposal_id,
|
||||
ObLS *ls)
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeGuard time_guard("switch_leader_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME);
|
||||
@ -704,7 +717,7 @@ int ObRoleChangeService::switch_leader_to_leader_(
|
||||
|| OB_FAIL(switch_leader_to_follower_forcedly_(curr_proposal_id, ls))) {
|
||||
CLOG_LOG(WARN, "switch_leader_to_leader_, switch leader to follower failed", K(ret), KPC(ls));
|
||||
} else if (FALSE_IT(time_guard.click("switch_follower_to_leader_"))
|
||||
|| OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls))) {
|
||||
|| OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
|
||||
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(new_proposal_id));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "switch_leader_to_leader_ success", K(ret), KPC(ls));
|
||||
@ -813,16 +826,18 @@ int ObRoleChangeService::switch_leader_to_leader_restore_(
|
||||
|
||||
int ObRoleChangeService::wait_replay_service_replay_done_(
|
||||
const share::ObLSID &ls_id,
|
||||
const palf::LSN &end_lsn)
|
||||
const palf::LSN &end_lsn,
|
||||
const int64_t timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_done = false;
|
||||
const int64_t start_ts = ObTimeUtility::current_time();
|
||||
while (OB_SUCC(ret) && false == is_done) {
|
||||
palf::TimeoutChecker not_timeout(timeout_us);
|
||||
while (OB_SUCC(ret) && false == is_done && OB_SUCC(not_timeout())) {
|
||||
if (OB_FAIL(replay_service_->is_replay_done(ls_id, end_lsn, is_done))) {
|
||||
CLOG_LOG(WARN, "replay_service_ is_replay_done failed", K(ret), K(is_done), K(end_lsn));
|
||||
} else if (false == is_done) {
|
||||
ob_usleep(5*1000);
|
||||
ob_usleep(50*1000);
|
||||
CLOG_LOG(INFO, "wait replay done return false, need retry", K(ls_id), K(end_lsn), K(start_ts));
|
||||
} else {
|
||||
}
|
||||
@ -959,5 +974,14 @@ bool ObRoleChangeService::is_raw_write_or_flashback_mode(const AccessMode &mode)
|
||||
AccessMode::PREPARE_FLASHBACK == mode);
|
||||
}
|
||||
|
||||
bool ObRoleChangeService::need_retry_submit_role_change_event_(int ret) const
|
||||
{
|
||||
bool bool_ret = false;
|
||||
if (OB_TIMEOUT == ret) {
|
||||
bool_ret = true;
|
||||
}
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
} // end namespace logservice
|
||||
} // end namespace oceanbase
|
||||
|
||||
@ -82,10 +82,44 @@ public:
|
||||
int on_need_change_leader(const int64_t ls_id, const common::ObAddr &dst_addr) final override;
|
||||
int diagnose(RCDiagnoseInfo &diagnose_info);
|
||||
|
||||
private:
|
||||
enum class RoleChangeOptType {
|
||||
INVALID_RC_OPT_TYPE = 0,
|
||||
FOLLOWER_2_LEADER = 1,
|
||||
LEADER_2_FOLLOWER = 2,
|
||||
FOLLOWER_2_FOLLOWER = 3,
|
||||
LEADER_2_LEADER = 4,
|
||||
MAX_RC_OPT_TYPE = 5
|
||||
};
|
||||
enum class RetrySubmitRoleChangeEventReason {
|
||||
INVALID_TYPE = 0,
|
||||
WAIT_REPLAY_DONE_TIMEOUT = 1,
|
||||
MAX_TYPE = 2
|
||||
};
|
||||
class RetrySubmitRoleChangeEventCtx {
|
||||
public:
|
||||
RetrySubmitRoleChangeEventCtx() : reason_(RetrySubmitRoleChangeEventReason::INVALID_TYPE) {}
|
||||
~RetrySubmitRoleChangeEventCtx()
|
||||
{
|
||||
reason_ = RetrySubmitRoleChangeEventReason::INVALID_TYPE;
|
||||
}
|
||||
bool need_retry() const
|
||||
{
|
||||
return RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT == reason_;
|
||||
}
|
||||
void set_retry_reason(const RetrySubmitRoleChangeEventReason &reason)
|
||||
{
|
||||
reason_ = reason;
|
||||
}
|
||||
TO_STRING_KV(K_(reason));
|
||||
private:
|
||||
RetrySubmitRoleChangeEventReason reason_;
|
||||
};
|
||||
private:
|
||||
int submit_role_change_event_(const RoleChangeEvent &event);
|
||||
int push_event_into_queue_(const RoleChangeEvent &event);
|
||||
int handle_role_change_event_(const RoleChangeEvent &event);
|
||||
int handle_role_change_event_(const RoleChangeEvent &event,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx);
|
||||
|
||||
int handle_role_change_cb_event_for_restore_handler_(const palf::AccessMode &curr_access_mode,
|
||||
ObLS *ls);
|
||||
@ -93,25 +127,28 @@ private:
|
||||
ObLS *ls);
|
||||
|
||||
int handle_role_change_cb_event_for_log_handler_(const palf::AccessMode &curr_access_mode,
|
||||
ObLS *ls);
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx);
|
||||
int handle_change_leader_event_for_log_handler_(const common::ObAddr &dst_addr,
|
||||
ObLS *ls);
|
||||
|
||||
// retval
|
||||
// - OB_SUCCESS
|
||||
// - OB_TIMEOUT, means wait replay finish timeout.
|
||||
int switch_follower_to_leader_(const int64_t new_proposal_id,
|
||||
ObLS *ls);
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx);
|
||||
int switch_leader_to_follower_forcedly_(const int64_t new_proposal_id,
|
||||
ObLS *ls);
|
||||
int switch_leader_to_follower_gracefully_(const int64_t new_proposal_id,
|
||||
const int64_t curr_proposal_id,
|
||||
const common::ObAddr &dst_addr,
|
||||
ObLS *ls);
|
||||
int switch_leader_to_leader_(const common::ObRole &new_role,
|
||||
const int64_t new_proposal_id,
|
||||
ObLS *ls);
|
||||
int switch_follower_to_follower_(const int64_t new_proposal_id, ObLS *ls);
|
||||
int switch_leader_to_leader_(const int64_t new_proposal_id,
|
||||
const int64_t curr_proposal_id,
|
||||
ObLS *ls);
|
||||
ObLS *ls,
|
||||
RetrySubmitRoleChangeEventCtx &retry_ctx);
|
||||
|
||||
int switch_follower_to_leader_restore_(const int64_t new_proposal_id,
|
||||
ObLS *ls);
|
||||
@ -124,8 +161,10 @@ private:
|
||||
int switch_leader_to_leader_restore_(const int64_t new_proposal_id,
|
||||
const int64_t curr_proposal_id,
|
||||
ObLS *ls);
|
||||
// wait replay finish with timeout.
|
||||
int wait_replay_service_replay_done_(const share::ObLSID &ls_id,
|
||||
const palf::LSN &end_lsn);
|
||||
const palf::LSN &end_lsn,
|
||||
const int64_t timeout_us);
|
||||
int wait_apply_service_apply_done_(const share::ObLSID &ls_id,
|
||||
palf::LSN &end_lsn);
|
||||
int wait_apply_service_apply_done_when_change_leader_(const ObLogHandler *log_handler,
|
||||
@ -142,17 +181,13 @@ private:
|
||||
bool is_append_mode(const palf::AccessMode &access_mode) const;
|
||||
bool is_raw_write_or_flashback_mode(const palf::AccessMode &access_mode) const;
|
||||
private:
|
||||
enum class RoleChangeOptType {
|
||||
INVALID_RC_OPT_TYPE = 0,
|
||||
FOLLOWER_2_LEADER = 1,
|
||||
LEADER_2_FOLLOWER = 2,
|
||||
FOLLOWER_2_FOLLOWER = 3,
|
||||
LEADER_2_LEADER = 4,
|
||||
MAX_RC_OPT_TYPE = 5
|
||||
};
|
||||
RoleChangeOptType get_role_change_opt_type_(const common::ObRole &old_role,
|
||||
const common::ObRole &new_role,
|
||||
const bool need_transform_by_access_mode) const;
|
||||
// retry submit role change event
|
||||
// NB: nowdays, we only support retry submit role change event when wait replay finished
|
||||
// timeout.
|
||||
bool need_retry_submit_role_change_event_(int ret) const;
|
||||
public:
|
||||
static const int64_t MAX_THREAD_NUM = 1;
|
||||
static const int64_t MAX_RC_EVENT_TASK = 1024 * 1024;
|
||||
@ -160,6 +195,7 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRoleChangeService);
|
||||
private:
|
||||
static constexpr int64_t EACH_ROLE_CHANGE_COST_MAX_TIME = 1 * 1000 * 1000;
|
||||
static constexpr int64_t WAIT_REPLAY_DONE_TIMEOUT_US = 2 * 1000 * 1000;
|
||||
storage::ObLSService *ls_service_;
|
||||
logservice::ObLogApplyService *apply_service_;
|
||||
logservice::ObILogReplayService *replay_service_;
|
||||
|
||||
Reference in New Issue
Block a user