748 lines
30 KiB
C++
748 lines
30 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include "role_coordinator.h"
|
|
#include "common/ob_role.h"
|
|
#include "lib/ob_define.h"
|
|
#include "lib/ob_errno.h"
|
|
#include "lib/time/ob_time_utility.h"
|
|
#include "lib/utility/ob_macro_utils.h"
|
|
#include "lib/thread/thread_mgr.h"
|
|
#include "logservice/palf/log_define.h"
|
|
#include "share/ob_errno.h"
|
|
#include "share/ob_ls_id.h"
|
|
#include "share/ob_thread_define.h"
|
|
#include "share/rc/ob_tenant_base.h"
|
|
#include "storage/tx_storage/ob_ls_service.h"
|
|
#include "storage/tx_storage/ob_ls_handle.h"
|
|
#include "share/ob_occam_time_guard.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
using namespace common;
|
|
using namespace palf;
|
|
using namespace logservice;
|
|
using namespace share;
|
|
namespace palfcluster
|
|
{
|
|
|
|
RoleChangeEvent::RoleChangeEvent(const RoleChangeEventType &event_type,
|
|
const share::ObLSID &ls_id) : event_type_(event_type),
|
|
ls_id_(ls_id)
|
|
{
|
|
}
|
|
RoleChangeEvent::RoleChangeEvent(const RoleChangeEventType &event_type,
|
|
const share::ObLSID &ls_id,
|
|
const common::ObAddr &dst_addr) : event_type_(event_type),
|
|
ls_id_(ls_id),
|
|
dst_addr_(dst_addr)
|
|
{
|
|
}
|
|
|
|
bool RoleChangeEvent::is_valid() const
|
|
{
|
|
return RoleChangeEventType::INVALID_RC_EVENT_TYPE != event_type_
|
|
&& false != ls_id_.is_valid();
|
|
}
|
|
|
|
void RoleChangeEvent::reset()
|
|
{
|
|
event_type_ = RoleChangeEventType::INVALID_RC_EVENT_TYPE;
|
|
ls_id_.reset();
|
|
dst_addr_.reset();
|
|
}
|
|
|
|
bool RoleChangeEvent::operator==(const RoleChangeEvent &rhs) const
|
|
{
|
|
// for change leader event, we just check 'ls_id'.
|
|
return event_type_ == rhs.event_type_ && ls_id_ == rhs.ls_id_;
|
|
}
|
|
|
|
RoleChangeEventSet::RoleChangeEventSet()
|
|
{}
|
|
|
|
RoleChangeEventSet::~RoleChangeEventSet()
|
|
{}
|
|
|
|
int RoleChangeEventSet::insert(const RoleChangeEvent &event)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t free_idx = -1;
|
|
ObSpinLockGuard guard(lock_);
|
|
for (int64_t i = 0; i < MAX_ARRAY_SIZE; i++) {
|
|
if (event == events_[i]) {
|
|
ret = OB_ENTRY_EXIST;
|
|
}
|
|
}
|
|
for (int64_t i = 0; i < MAX_ARRAY_SIZE && -1 == free_idx && OB_SUCC(ret); i++) {
|
|
if (false == events_[i].is_valid()) {
|
|
free_idx = i;
|
|
}
|
|
}
|
|
if (OB_ENTRY_EXIST == ret) {
|
|
} else if (-1 != free_idx) {
|
|
events_[free_idx] = event;
|
|
} else {
|
|
ret = OB_SIZE_OVERFLOW;
|
|
}
|
|
CLOG_LOG(INFO, "insert event into set success", K(ret), K(event), K(free_idx));
|
|
return ret;
|
|
}
|
|
|
|
int RoleChangeEventSet::remove(const RoleChangeEvent &event)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t delete_idx = -1;
|
|
ObSpinLockGuard guard(lock_);
|
|
for (int64_t i = 0; i < MAX_ARRAY_SIZE && -1 == delete_idx; i++) {
|
|
if (event == events_[i]) {
|
|
delete_idx = i;
|
|
}
|
|
};
|
|
if (-1 != delete_idx) {
|
|
events_[delete_idx].reset();
|
|
} else {
|
|
ret = OB_ENTRY_NOT_EXIST;
|
|
}
|
|
CLOG_LOG(INFO, "remove slog from set success", K(ret), K(delete_idx), K(event));
|
|
return ret;
|
|
}
|
|
|
|
RoleCoordinator::RoleCoordinator() : log_client_map_(NULL),
|
|
apply_service_(NULL),
|
|
tg_id_(-1),
|
|
is_inited_(false)
|
|
{
|
|
}
|
|
|
|
RoleCoordinator::~RoleCoordinator()
|
|
{
|
|
if (IS_INIT) {
|
|
destroy();
|
|
}
|
|
}
|
|
|
|
int RoleCoordinator::init(palfcluster::LogClientMap *log_client_map,
|
|
logservice::ObLogApplyService *apply_service)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int tg_id = lib::TGDefIDs::RCService;
|
|
if (IS_INIT) {
|
|
ret = OB_INIT_TWICE;
|
|
} else if (OB_ISNULL(log_client_map) || OB_ISNULL(apply_service)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
CLOG_LOG(WARN, "invalid argument", K(ret), KP(apply_service));
|
|
} else if (OB_FAIL(TG_CREATE_TENANT(tg_id, tg_id_))) {
|
|
CLOG_LOG(WARN, "RoleCoordinator TG_CREATE failed", K(ret));
|
|
} else {
|
|
apply_service_ = apply_service;
|
|
log_client_map_ = log_client_map;
|
|
is_inited_ = true;
|
|
CLOG_LOG(INFO, "RoleCoordinator init success", K(ret), K(tg_id_), KP(apply_service));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::start()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (IS_NOT_INIT) {
|
|
ret = OB_NOT_INIT;
|
|
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
|
|
CLOG_LOG(WARN, "RoleCoordinator start failed", K(ret), K(tg_id_));
|
|
} else {
|
|
CLOG_LOG(INFO, "RoleCoordinator start success", K(ret), K(tg_id_));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void RoleCoordinator::wait()
|
|
{
|
|
if (IS_INIT) {
|
|
TG_STOP(tg_id_);
|
|
TG_WAIT(tg_id_);
|
|
}
|
|
CLOG_LOG(INFO, "RoleCoordinator wait finish", K(tg_id_));
|
|
}
|
|
|
|
void RoleCoordinator::stop()
|
|
{
|
|
if (IS_INIT) {
|
|
TG_STOP(tg_id_);
|
|
}
|
|
CLOG_LOG(INFO, "RoleCoordinator stop finish", K(tg_id_));
|
|
}
|
|
|
|
void RoleCoordinator::destroy()
|
|
{
|
|
if (IS_INIT) {
|
|
(void)stop();
|
|
(void)wait();
|
|
TG_DESTROY(tg_id_);
|
|
is_inited_ = false;
|
|
tg_id_ = -1;
|
|
apply_service_ = NULL;
|
|
CLOG_LOG(INFO, "RoleCoordinator destroy success");
|
|
}
|
|
}
|
|
|
|
void RoleCoordinator::handle(void *task)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
// When role chage service hang exceeds 30 seconds, we think there is dead lock in 'handle_role_change_event_',
|
|
// TIMEGUARD will pring lbt().
|
|
TIMEGUARD_INIT(CLOG, 30_s, 30_s);
|
|
RoleChangeEvent *event = reinterpret_cast<RoleChangeEvent*>(task);
|
|
const int64_t ls_id = event->ls_id_.id();
|
|
const int64_t start_ts = ObTimeUtility::current_time();
|
|
RetrySubmitRoleChangeEventCtx retry_ctx;
|
|
CLOG_LOG(INFO, "begin handle_role_change_event_", "sequence:", start_ts, KPC(event));
|
|
if (NULL == event) {
|
|
CLOG_LOG(WARN, "unexpected error, task is nullptr", KP(event));
|
|
} else if (OB_FAIL(handle_role_change_event_(*event, retry_ctx))) {
|
|
CLOG_LOG(WARN, "handle_role_change_event_ failed", K(ret), KPC(event), K(retry_ctx));
|
|
} else {
|
|
CLOG_LOG(INFO, "end handle_role_change_event_", "sequence:", start_ts, KPC(event));
|
|
}
|
|
if (NULL != event) {
|
|
OB_DELETE(RoleChangeEvent, "RCService", event);
|
|
}
|
|
if (retry_ctx.need_retry() && OB_FAIL(on_role_change(ls_id))) {
|
|
CLOG_LOG(WARN, "retry submit role change event failed", K(ls_id), K(retry_ctx));
|
|
}
|
|
}
|
|
|
|
int RoleCoordinator::on_role_change(const int64_t id)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
share::ObLSID ls_id(id);
|
|
RoleChangeEvent event(RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE, ls_id);
|
|
// TODO by runlin: if task queue has been full, push task will be failed, the role change event
|
|
// will be lost.
|
|
if (OB_FAIL(submit_role_change_event_(event))) {
|
|
CLOG_LOG(WARN, "submit_role_change_event_ failed", K(ret), K(event));
|
|
} else {
|
|
CLOG_LOG(INFO, "on_role_change success", K(ret), K(event));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::on_need_change_leader(const int64_t ls_id, const common::ObAddr &dst_addr)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
RoleChangeEvent event(RoleChangeEventType::CHANGE_LEADER_EVENT_TYPE, share::ObLSID(ls_id), dst_addr);
|
|
if (OB_FAIL(submit_role_change_event_(event))) {
|
|
CLOG_LOG(WARN, "submit_role_change_event_ failed", K(ret), K(event));
|
|
} else {
|
|
CLOG_LOG(INFO, "change_leader success", K(ret), K(event));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::submit_role_change_event_(const RoleChangeEvent &event)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if(OB_FAIL(rc_set_.insert(event)) && OB_ENTRY_EXIST != ret) {
|
|
CLOG_LOG(ERROR, "insert into rc_set failed", K(ret), K(event));
|
|
} else if (OB_ENTRY_EXIST == ret) {
|
|
CLOG_LOG(INFO, "repeat role change event, filter it", K(ret), K(event));
|
|
ret = OB_SUCCESS;
|
|
} else if (OB_FAIL(push_event_into_queue_(event))) {
|
|
CLOG_LOG(WARN, "push_event_into_queue_ failed", K(ret), K(event));
|
|
} else {
|
|
CLOG_LOG(INFO, "submit_role_change_event_ success", K(ret), K(event));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
// TODO: use poll to avoid alloc memory failed.
|
|
int RoleCoordinator::push_event_into_queue_(const RoleChangeEvent &event)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
RoleChangeEvent *rc_event = NULL;
|
|
|
|
int64_t warn_time = OB_INVALID_TIMESTAMP;
|
|
do {
|
|
if (NULL == (rc_event =
|
|
MTL_NEW(RoleChangeEvent, "RCService", event.event_type_, event.ls_id_, event.dst_addr_))) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
if (palf_reach_time_interval(1 * 1000 * 1000, warn_time)) {
|
|
CLOG_LOG(WARN, "allocate memory failed", K(ret), K(event));
|
|
}
|
|
usleep(1 * 1000);
|
|
} else {
|
|
ret = OB_SUCCESS;
|
|
}
|
|
} while(OB_FAIL(ret));
|
|
|
|
if (OB_FAIL(TG_PUSH_TASK(tg_id_, rc_event))) {
|
|
CLOG_LOG(WARN, "ObRoleChangeTask push task failed", K(ret), K(event));
|
|
}
|
|
if (OB_FAIL(ret) && NULL != rc_event) {
|
|
MTL_DELETE(RoleChangeEvent, "RCService", rc_event);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::handle_role_change_event_(const RoleChangeEvent &event,
|
|
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObLogClient *ls = nullptr;
|
|
AccessMode curr_access_mode;
|
|
int64_t unused_mode_version;
|
|
OB_ASSERT(OB_SUCCESS == rc_set_.remove(event));
|
|
if (false == event.is_valid()) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
CLOG_LOG(WARN, "invalid argument", K(event));
|
|
} else if (OB_FAIL(log_client_map_->get(event.ls_id_, ls) || OB_ISNULL(ls))) {
|
|
CLOG_LOG(ERROR, "not exist loghandler", K(ret), K(event.ls_id_), KP(ls));
|
|
ret = OB_ERR_UNEXPECTED;
|
|
} else if (OB_FAIL(ls->get_log_handler()->get_access_mode(unused_mode_version, curr_access_mode))) {
|
|
CLOG_LOG(WARN, "ObLogHandler get_access_mode failed", K(ret));
|
|
} else {
|
|
switch (event.event_type_) {
|
|
case RoleChangeEventType::CHANGE_LEADER_EVENT_TYPE:
|
|
CLOG_LOG(INFO, "begin change leader", K(curr_access_mode), K(event), KPC(ls));
|
|
if (is_append_mode(curr_access_mode)
|
|
&& OB_FAIL(handle_change_leader_event_for_log_handler_(event.dst_addr_, ls))) {
|
|
CLOG_LOG(WARN, "ObLogHandler change leader failed", K(ret), K(event), KPC(ls));
|
|
}
|
|
CLOG_LOG(INFO, "end change leader", K(ret), K(curr_access_mode), K(event), KPC(ls));
|
|
break;
|
|
case RoleChangeEventType::ROLE_CHANGE_CB_EVENT_TYPE:
|
|
CLOG_LOG(INFO, "begin log handler role change", K(curr_access_mode), K(event), KPC(ls));
|
|
if (OB_FAIL(handle_role_change_cb_event_for_log_handler_(curr_access_mode, ls, retry_ctx))) {
|
|
CLOG_LOG(WARN, "handle_role_change_cb_event_for_log_handler_ failed", K(ret),
|
|
K(curr_access_mode), KPC(ls));
|
|
}
|
|
CLOG_LOG(INFO, "end log handler role change", K(ret), K(curr_access_mode), K(event), KPC(ls), K(retry_ctx));
|
|
break;
|
|
default:
|
|
ret = OB_ERR_UNEXPECTED;
|
|
CLOG_LOG(WARN, "unexpected role change event type", K(ret));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::handle_role_change_cb_event_for_log_handler_(
|
|
const AccessMode &curr_access_mode,
|
|
ObLogClient*ls,
|
|
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const bool log_handler_is_offline = ls->get_log_handler()->is_offline();
|
|
|
|
// If log handler is offline, need execute LEADER_2_FOLLOWER or FOLLOWER_2_FOLLOWER
|
|
//
|
|
// when access mode is APPEND, log_handler need execute leader to follower or
|
|
// follower to leader. otherwise, only need execute leader to follower or follower
|
|
// to follower, therefore, we set 'need_transform_by_access_mode' to false when
|
|
// 'curr_access_mode' is APPEND.
|
|
const bool only_need_change_to_follower = !is_append_mode(curr_access_mode) || log_handler_is_offline;
|
|
RoleChangeOptType opt_type;
|
|
ObRole curr_role = ObRole::INVALID_ROLE;
|
|
ObRole new_role = ObRole::INVALID_ROLE;
|
|
bool is_pending_state = false;
|
|
int64_t curr_proposal_id = -1;
|
|
int64_t new_proposal_id = -1;
|
|
if (OB_FAIL(ls->get_log_handler()->prepare_switch_role(curr_role,
|
|
curr_proposal_id, new_role, new_proposal_id, is_pending_state))) {
|
|
CLOG_LOG(WARN, "ObLogHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
|
|
K(new_role), K(new_proposal_id));
|
|
} else if (false == need_execute_role_change(curr_proposal_id, curr_role, new_proposal_id,
|
|
new_role, is_pending_state, log_handler_is_offline)) {
|
|
CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id),
|
|
K(new_role), K(new_proposal_id), K(is_pending_state), K(log_handler_is_offline));
|
|
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, only_need_change_to_follower))) {
|
|
} else {
|
|
switch (opt_type) {
|
|
// leader -> follower
|
|
case RoleChangeOptType::LEADER_2_FOLLOWER:
|
|
if (OB_FAIL(switch_leader_to_follower_forcedly_(new_proposal_id, ls))) {
|
|
CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
|
}
|
|
break;
|
|
// follower -> follower
|
|
case RoleChangeOptType::FOLLOWER_2_LEADER:
|
|
if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
|
|
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
|
}
|
|
break;
|
|
// leader -> leader
|
|
case RoleChangeOptType::LEADER_2_LEADER:
|
|
if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls, retry_ctx))) {
|
|
CLOG_LOG(WARN, "switch_leader_to_leader_ failed", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
|
}
|
|
break;
|
|
// follower -> follower
|
|
case RoleChangeOptType::FOLLOWER_2_FOLLOWER:
|
|
if (OB_FAIL(switch_follower_to_follower_(new_proposal_id, ls))) {
|
|
CLOG_LOG(WARN, "switch_follower_to_follower_ failed", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
|
|
}
|
|
break;
|
|
default:
|
|
ret = OB_ERR_UNEXPECTED;
|
|
CLOG_LOG(ERROR, "unexpected error, can not handle role change", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(new_proposal_id), KPC(ls));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::handle_change_leader_event_for_log_handler_(
|
|
const common::ObAddr &dst_addr,
|
|
ObLogClient*ls)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObRole curr_role = ObRole::INVALID_ROLE;
|
|
ObRole new_role = ObRole::INVALID_ROLE;
|
|
bool is_pending_state = false;
|
|
int64_t curr_proposal_id = -1;
|
|
int64_t new_proposal_id = -1;
|
|
if (OB_FAIL(ls->get_log_handler()->prepare_switch_role(curr_role,
|
|
curr_proposal_id, new_role, new_proposal_id, is_pending_state))) {
|
|
CLOG_LOG(WARN, "ObLogHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
|
|
K(new_role), K(new_proposal_id));
|
|
} else if (true == is_pending_state
|
|
|| curr_proposal_id != new_proposal_id || LEADER != curr_role || LEADER != new_role) {
|
|
// when log handler is not LEDAER, we also need execute change_leader_to, otherwise, the leader can not be changed by election.
|
|
ls->get_log_handler()->change_leader_to(dst_addr);
|
|
CLOG_LOG(INFO, "no need execute switch_leader_to_follower_gracefully, change leader directlly",
|
|
K(ret), K(is_pending_state), K(curr_proposal_id), K(new_proposal_id), K(curr_role), K(new_role));
|
|
} else if (OB_FAIL(switch_leader_to_follower_gracefully_(new_proposal_id, curr_proposal_id,
|
|
dst_addr, ls))) {
|
|
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully_ failed", K(ret), KPC(ls),
|
|
K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id));
|
|
} else {
|
|
CLOG_LOG(INFO, "handle_change_leader_event_for_log_handler_ success", K(ret), K(curr_role),
|
|
K(curr_proposal_id), K(new_role), K(new_proposal_id), K(dst_addr));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::switch_follower_to_leader_(
|
|
const int64_t new_proposal_id,
|
|
ObLogClient*ls,
|
|
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const ObRole new_role = LEADER;
|
|
const share::ObLSID &ls_id = ls->get_ls_id();
|
|
palf::LSN end_lsn;
|
|
ObTimeGuard time_guard("switch_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME);
|
|
ObLogHandler *log_handler = ls->get_log_handler();
|
|
// ObRoleChangeHandler *role_change_handler = ls->get_role_change_handler();
|
|
if (OB_FAIL(log_handler->get_end_lsn(end_lsn))) {
|
|
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(ls));
|
|
// NB: order is vital!!!
|
|
// We must guarantee that 'replay_service_' has replayed complete data, and before
|
|
// stop 'replay_service_', other components can not submit log.
|
|
} else if (FALSE_IT(time_guard.click("wait_replay_service_apply_done_"))
|
|
|| OB_FAIL(wait_replay_service_replay_done_(ls_id, end_lsn, WAIT_REPLAY_DONE_TIMEOUT_US))) {
|
|
if (need_retry_submit_role_change_event_(ret)) {
|
|
retry_ctx.set_retry_reason(RetrySubmitRoleChangeEventReason::WAIT_REPLAY_DONE_TIMEOUT);
|
|
} else {
|
|
CLOG_LOG(WARN, "wait_replay_service_replay_done_ failed", K(ret), K(end_lsn));
|
|
}
|
|
} else if (FALSE_IT(time_guard.click("apply_service->switch_to_leader"))
|
|
|| OB_FAIL(apply_service_->switch_to_leader(ls_id, new_proposal_id))) {
|
|
CLOG_LOG(WARN, "apply_service_ switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id));
|
|
// } else if (FALSE_IT(time_guard.click("replay_service->switch_to_leader"))
|
|
// || OB_FAIL(replay_service_->switch_to_leader(ls_id))) {
|
|
} else if (FALSE_IT(log_handler->switch_role(new_role, new_proposal_id))) {
|
|
CLOG_LOG(WARN, "ObLogHandler switch role failed", K(ret), K(new_role), K(new_proposal_id));
|
|
// } else if (FALSE_IT(time_guard.click("role_change_handler->switch_to_leader"))
|
|
// || OB_FAIL(role_change_handler->switch_to_leader())) {
|
|
// CLOG_LOG(WARN, "ObRoleChangeHandler switch_to_leader failed", K(ret), KPC(ls));
|
|
} else {
|
|
CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls));
|
|
}
|
|
if (OB_FAIL(ret) && !retry_ctx.need_retry()) {
|
|
log_handler->change_leader_to(GCTX.self_addr());
|
|
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), KPC(ls));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::switch_leader_to_follower_forcedly_(
|
|
const int64_t new_proposal_id,
|
|
ObLogClient*ls)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const ObRole new_role = FOLLOWER;
|
|
const share::ObLSID &ls_id = ls->get_ls_id();
|
|
ObLogHandler *log_handler = ls->get_log_handler();
|
|
// ObRoleChangeHandler *role_change_handler = ls->get_role_change_handler();
|
|
palf::LSN end_lsn;
|
|
ObTimeGuard time_guard("switch_leader_to_follower_forcedly_", EACH_ROLE_CHANGE_COST_MAX_TIME);
|
|
|
|
// Why need wait_apply_sync?
|
|
//
|
|
// when we can execute 'switch_to_follower_forcedly', means that there is no possibility to submit log via log handler successfully.
|
|
// however, the flying callback may have not been pushed into apply service, and then, 'switch_to_follower' will be executed, for trans,
|
|
// if the callback be executed after 'switch_to_follower', will cause abort.
|
|
if (OB_FAIL(apply_service_->wait_append_sync(ls_id))) {
|
|
CLOG_LOG(WARN, "wait_apply_sync failed", K(ret), K(ls_id));
|
|
} else if (FALSE_IT(time_guard.click("apply_service->wait_apply_sync"))
|
|
|| OB_FAIL(apply_service_->switch_to_follower(ls_id))) {
|
|
CLOG_LOG(WARN, "apply_service_ switch_to_follower failed", K(ret), K(new_role), K(new_proposal_id));
|
|
} else if (FALSE_IT(time_guard.click("apply_service->switch_to_follower"))
|
|
|| OB_FAIL(wait_apply_service_apply_done_(ls_id, end_lsn))) {
|
|
CLOG_LOG(WARN, "wait_apply_service_apply_done_ failed", K(ret), K(end_lsn));
|
|
} else {
|
|
time_guard.click("wait_apply_service_apply_done_");
|
|
// role_change_handler->switch_to_follower_forcedly();
|
|
// NB: order is vital
|
|
// We must guarantee that this replica will not submit any logs after switch_role.
|
|
log_handler->switch_role(new_role, new_proposal_id);
|
|
// NB: in case of leader reovke, do we no need retry.
|
|
// (void)replay_service_->switch_to_follower(ls_id, end_lsn);
|
|
CLOG_LOG(INFO, "switch_leader_to_follower_forcedly_ success", K(ret), KPC(ls));
|
|
}
|
|
if (OB_FAIL(ret)) {
|
|
log_handler->change_leader_to(GCTX.self_addr());
|
|
CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(new_proposal_id), K(new_role));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::switch_leader_to_follower_gracefully_(
|
|
const int64_t new_proposal_id,
|
|
const int64_t curr_proposal_id,
|
|
const common::ObAddr &dst_addr,
|
|
ObLogClient*ls)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int tmp_ret = OB_SUCCESS;
|
|
const ObRole new_role = FOLLOWER;
|
|
const share::ObLSID &ls_id = ls->get_ls_id();
|
|
ObLogHandler *log_handler = ls->get_log_handler();
|
|
// ObRoleChangeHandler *role_change_handler = ls->get_role_change_handler();
|
|
LSN end_lsn;
|
|
ObTimeGuard time_guard("switch_leader_to_follower_gracefully_", EACH_ROLE_CHANGE_COST_MAX_TIME);
|
|
// 1. OB_SUCCESS means execute transaction successfully, we need execute follow steps.
|
|
// 2. OB_LS_NEED_REVOKE means the transaction execute failed, and can't been rollback, need revoke LS.
|
|
// 3. OTHERS, switch_to_follower_gracefully failed, and 'role_change_handler' has rollback success,
|
|
// no need to execute follow steps.
|
|
// if (FALSE_IT(time_guard.click("role_change_handler->switch_to_follower_gracefully"))
|
|
// || OB_SUCCESS != (tmp_ret = role_change_handler->switch_to_follower_gracefully())) {
|
|
// CLOG_LOG(WARN, "switch_to_follower_gracefully failed, need revoke leader", K(tmp_ret),
|
|
// K(new_role), K(new_proposal_id), K(dst_addr));
|
|
// NB: order is vital!!!
|
|
// we must ensure that the 'end_lsn' provid by 'apply_service_' is correctly.
|
|
// just switch_role to follower firstly, avoid sync log failed because palf has changed leader.
|
|
if (FALSE_IT(log_handler->switch_role(new_role, curr_proposal_id))) {
|
|
// apply service will not update end_lsn after switch_to_follower, so wait apply done first here
|
|
} else if (FALSE_IT(time_guard.click("wait_apply_service_apply_done_when_change_leader_"))
|
|
|| OB_FAIL(wait_apply_service_apply_done_when_change_leader_(log_handler, curr_proposal_id, ls_id, end_lsn))) {
|
|
CLOG_LOG(WARN, "wait_apply_service_apply_done_when_change_leader_ failed", K(ret),
|
|
K(new_role), K(new_proposal_id), K(dst_addr));
|
|
// wait apply service done my fail, we need :
|
|
// 1. switch log handler to origin status.
|
|
// 2. resume role change handler
|
|
log_handler->switch_role(LEADER, curr_proposal_id);
|
|
// if (OB_FAIL(role_change_handler->resume_to_leader())) {
|
|
// CLOG_LOG(WARN, "resume to leader failed", K(ret), KPC(ls));
|
|
// }
|
|
// NB: the following steps mustn't be failed.
|
|
} else if (FALSE_IT(time_guard.click("apply_service->switch_to_follower"))
|
|
|| OB_FAIL(apply_service_->switch_to_follower(ls_id))) {
|
|
CLOG_LOG(WARN, "apply_service_ switch_to_follower failed", K(ret), K(new_role), K(new_proposal_id), K(dst_addr));
|
|
// } else if (FALSE_IT(time_guard.click("replay_service->switch_to_follower"))
|
|
// || OB_FAIL(replay_service_->switch_to_follower(ls_id, end_lsn))) {
|
|
// CLOG_LOG(WARN, "replay_service_ switch_to_follower failed", K(ret), KPC(ls), K(new_role), K(new_proposal_id));
|
|
// NB: execute 'change_leader_to' lastly, can make 'wait_apply_service_apply_done_when_change_leader_' finish quickly.
|
|
} else if (OB_FAIL(log_handler->change_leader_to(dst_addr))) {
|
|
CLOG_LOG(WARN, "ObLogHandler change_leader failed", K(ret), K(new_role), K(new_proposal_id), K(dst_addr));
|
|
} else {
|
|
CLOG_LOG(INFO, "switch_to_follower_gracefully success", K(ret), K(new_role), K(new_proposal_id), K(dst_addr));
|
|
}
|
|
if (OB_FAIL(ret) || OB_LS_NEED_REVOKE == tmp_ret) {
|
|
log_handler->change_leader_to(GCTX.self_addr());
|
|
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully failed, revoke leader", K(ret), K(tmp_ret), K(dst_addr),
|
|
K(new_role), K(new_proposal_id));
|
|
ret = (OB_SUCCESS == ret ? tmp_ret : ret);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::switch_leader_to_leader_(
|
|
const int64_t new_proposal_id,
|
|
const int64_t curr_proposal_id,
|
|
ObLogClient*ls,
|
|
RetrySubmitRoleChangeEventCtx &retry_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTimeGuard time_guard("switch_leader_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME);
|
|
if (FALSE_IT(time_guard.click("switch_leader_to_follower_forcedly_"))
|
|
|| OB_FAIL(switch_leader_to_follower_forcedly_(curr_proposal_id, ls))) {
|
|
CLOG_LOG(WARN, "switch_leader_to_leader_, switch leader to follower failed", K(ret), KPC(ls));
|
|
} else if (FALSE_IT(time_guard.click("switch_follower_to_leader_"))
|
|
|| OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
|
|
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(new_proposal_id));
|
|
} else {
|
|
CLOG_LOG(INFO, "switch_leader_to_leader_ success", K(ret), KPC(ls));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::switch_follower_to_follower_(const int64_t new_proposal_id, ObLogClient*ls)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
// need update proposal_id
|
|
const share::ObLSID &ls_id = ls->get_ls_id();
|
|
ObLogHandler *log_handler = ls->get_log_handler();
|
|
(void) log_handler->switch_role(common::ObRole::FOLLOWER, new_proposal_id);
|
|
CLOG_LOG(INFO, "switch_follower_to_follower_");
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::wait_replay_service_replay_done_(
|
|
const share::ObLSID &ls_id,
|
|
const palf::LSN &end_lsn,
|
|
const int64_t timeout_us)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::wait_apply_service_apply_done_(
|
|
const share::ObLSID &ls_id,
|
|
palf::LSN &end_lsn)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
bool is_done = false;
|
|
const int64_t start_ts = ObTimeUtility::current_time();
|
|
while (OB_SUCC(ret) && false == is_done) {
|
|
if (OB_FAIL(apply_service_->is_apply_done(ls_id, is_done, end_lsn))) {
|
|
CLOG_LOG(WARN, "apply_service_ is_apply_done failed", K(ret), K(is_done), K(end_lsn));
|
|
} else if (false == is_done) {
|
|
ob_usleep(5*1000);
|
|
CLOG_LOG(WARN, "wait apply done return false, need retry", K(ls_id), K(is_done), K(end_lsn), K(start_ts));
|
|
} else {
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int RoleCoordinator::wait_apply_service_apply_done_when_change_leader_(
|
|
const ObLogHandler *log_handler,
|
|
const int64_t proposal_id,
|
|
const share::ObLSID &ls_id,
|
|
palf::LSN &end_lsn)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
bool is_done = false;
|
|
palf::LSN max_lsn;
|
|
common::ObRole unused_curr_role;
|
|
int64_t unused_curr_proposal_id;
|
|
common::ObRole new_role;
|
|
int64_t new_proposal_id;
|
|
bool is_pending_state = false;
|
|
while (OB_SUCC(ret) && (false == is_done || end_lsn != max_lsn)) {
|
|
if (OB_FAIL(apply_service_->is_apply_done(ls_id, is_done, end_lsn))) {
|
|
CLOG_LOG(WARN, "apply_service_ is_apply_done failed", K(ret), K(is_done), K(end_lsn));
|
|
// NB: ApplyService execute on_failure only when it's FOLLOWER, therefore ApplyService my not return apply done
|
|
// when it's LEADER, we need check the role of palf when has changed.
|
|
} else if (OB_FAIL(log_handler->get_max_lsn(max_lsn))) {
|
|
CLOG_LOG(WARN, "get_end_lsn from palf failed", K(ret), K(ls_id), K(end_lsn));
|
|
} else if (OB_FAIL(log_handler->prepare_switch_role(unused_curr_role, unused_curr_proposal_id,
|
|
new_role, new_proposal_id, is_pending_state))) {
|
|
CLOG_LOG(WARN, "failed prepare_switch_role", K(ret), K(new_role), K(proposal_id), K(ls_id));
|
|
// if palf has changed role, return OB_STATE_NOT_MATCH, change leader failed.
|
|
} else if (LEADER != new_role || proposal_id != new_proposal_id) {
|
|
ret = OB_STATE_NOT_MATCH;
|
|
CLOG_LOG(WARN, "palf has changed leader, wait_apply_service_apply_done_when_change_leader_ failed", K(ret), K(proposal_id),
|
|
K(new_proposal_id));
|
|
} else if (false == is_done || end_lsn != max_lsn) {
|
|
CLOG_LOG(INFO, "wait apply done return false, need retry", K(ls_id), K(is_done),
|
|
K(end_lsn), K(max_lsn));
|
|
ob_usleep(5*1000);
|
|
} else {
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
RoleCoordinator::RoleChangeOptType RoleCoordinator::get_role_change_opt_type_(
|
|
const ObRole &old_role,
|
|
const ObRole &new_role,
|
|
const bool need_transform_by_access_mode) const
|
|
{
|
|
RoleChangeOptType rc_opt_type = RoleChangeOptType::INVALID_RC_OPT_TYPE;
|
|
if (FOLLOWER == old_role && LEADER == new_role) {
|
|
rc_opt_type = RoleChangeOptType::FOLLOWER_2_LEADER;
|
|
} else if (LEADER == old_role && FOLLOWER == new_role) {
|
|
rc_opt_type = RoleChangeOptType::LEADER_2_FOLLOWER;
|
|
} else if (FOLLOWER == old_role && FOLLOWER == new_role) {
|
|
rc_opt_type = RoleChangeOptType::FOLLOWER_2_FOLLOWER;
|
|
} else if (LEADER == old_role && LEADER == new_role) {
|
|
rc_opt_type = RoleChangeOptType::LEADER_2_LEADER;
|
|
} else {
|
|
}
|
|
if (true == need_transform_by_access_mode) {
|
|
if (LEADER == old_role) {
|
|
rc_opt_type = RoleChangeOptType::LEADER_2_FOLLOWER;
|
|
} else {
|
|
rc_opt_type = RoleChangeOptType::FOLLOWER_2_FOLLOWER;
|
|
}
|
|
}
|
|
return rc_opt_type;
|
|
}
|
|
|
|
// NB:
|
|
// 1. when log handler is offline, need execute role change;
|
|
// 2. when palf is not in pending:
|
|
// 1. proposal_id is not same or
|
|
// 2. role is not same.(If there are no pending logs in sliding window,
|
|
// leader to follower will not advance proposal_id)
|
|
bool RoleCoordinator::need_execute_role_change(const int64_t curr_proposal_id,
|
|
const common::ObRole curr_role,
|
|
const int64_t new_proposal_id,
|
|
const common::ObRole new_role,
|
|
const bool is_pending_state,
|
|
const bool is_offline) const
|
|
{
|
|
return is_offline
|
|
|| (!is_pending_state
|
|
&& (curr_proposal_id != new_proposal_id || curr_role != new_role));
|
|
}
|
|
|
|
bool RoleCoordinator::is_append_mode(const AccessMode &mode) const
|
|
{
|
|
return (AccessMode::APPEND == mode);
|
|
}
|
|
|
|
bool RoleCoordinator::is_raw_write_or_flashback_mode(const AccessMode &mode) const
|
|
{
|
|
return (AccessMode::RAW_WRITE == mode || AccessMode::FLASHBACK == mode ||
|
|
AccessMode::PREPARE_FLASHBACK == mode);
|
|
}
|
|
|
|
bool RoleCoordinator::need_retry_submit_role_change_event_(int ret) const
|
|
{
|
|
bool bool_ret = false;
|
|
if (OB_TIMEOUT == ret) {
|
|
bool_ret = true;
|
|
}
|
|
return bool_ret;
|
|
}
|
|
|
|
} // end namespace palfcluster
|
|
} // end namespace oceanbase
|