fixed offline can not been finished when palf is FOLLOWER PENDING
This commit is contained in:
@ -556,14 +556,14 @@ int ObApplyStatus::switch_to_leader(const int64_t new_proposal_id)
|
||||
} else if (is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
CLOG_LOG(INFO, "apply status has been stopped");
|
||||
} else if (new_proposal_id < proposal_id_) {
|
||||
} else if (new_proposal_id < ATOMIC_LOAD(&proposal_id_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid proposal id", K(ret), K(new_proposal_id), KPC(this));
|
||||
} else if (LEADER == role_) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
CLOG_LOG(WARN, "apply status has already been leader", KPC(this));
|
||||
} else {
|
||||
proposal_id_ = new_proposal_id;
|
||||
ATOMIC_STORE(&proposal_id_, new_proposal_id);
|
||||
role_ = LEADER;
|
||||
CLOG_LOG(INFO, "apply status switch_to_leader success", KPC(this));
|
||||
}
|
||||
@ -572,8 +572,12 @@ int ObApplyStatus::switch_to_leader(const int64_t new_proposal_id)
|
||||
|
||||
int ObApplyStatus::switch_to_follower()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuardWithRetryInterval guard(lock_, WRLOCK_RETRY_INTERVAL_US, WRLOCK_RETRY_INTERVAL_US);
|
||||
return switch_to_follower_();
|
||||
if (OB_FAIL(switch_to_follower_())) {
|
||||
CLOG_LOG(WARN, "ObApplyStatus switch_to_follower_ failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
//需要锁保护
|
||||
@ -610,10 +614,11 @@ int ObApplyStatus::update_palf_committed_end_lsn(const palf::LSN &end_lsn,
|
||||
} else {
|
||||
{
|
||||
RLockGuard guard(lock_);
|
||||
int64_t curr_proposal_id = ATOMIC_LOAD(&proposal_id_);
|
||||
if (is_in_stop_state_) {
|
||||
//skip
|
||||
CLOG_LOG(WARN, "apply status has been stopped", K(ret), KPC(this));
|
||||
} else if (proposal_id == proposal_id_ && LEADER == role_) {
|
||||
} else if (proposal_id == curr_proposal_id && LEADER == role_) {
|
||||
if (palf_committed_end_lsn_ >= end_lsn) {
|
||||
CLOG_LOG(ERROR, "invalid new end_lsn", KPC(this), K(proposal_id), K(end_lsn));
|
||||
} else {
|
||||
@ -622,8 +627,8 @@ int ObApplyStatus::update_palf_committed_end_lsn(const palf::LSN &end_lsn,
|
||||
CLOG_LOG(ERROR, "submit_task_to_apply_service_ failed", KPC(this), K(ret), K(proposal_id), K(end_lsn));
|
||||
}
|
||||
}
|
||||
} else if ((proposal_id == proposal_id_ && FOLLOWER == role_)
|
||||
|| proposal_id < proposal_id_) {
|
||||
} else if ((proposal_id == curr_proposal_id && FOLLOWER == role_)
|
||||
|| proposal_id < curr_proposal_id) {
|
||||
// apply切为follower之后, 同proposal_id的日志不应该还能滑出
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(ERROR, "invalid new end_lsn", KPC(this), K(proposal_id), K(end_lsn));
|
||||
@ -769,6 +774,12 @@ int ObApplyStatus::diagnose(ApplyDiagnoseInfo &diagnose_info)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObApplyStatus::reset_proposal_id()
|
||||
{
|
||||
ATOMIC_STORE(&proposal_id_, -1);
|
||||
CLOG_LOG(INFO, "reset_proposal_id success");
|
||||
}
|
||||
|
||||
int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -793,6 +804,7 @@ int ObApplyStatus::update_last_check_log_ts_ns_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRole palf_role = FOLLOWER;
|
||||
int64_t curr_proposal_id = ATOMIC_LOAD(&proposal_id_);
|
||||
int64_t palf_proposal_id = -1;
|
||||
bool is_pending_state = true;
|
||||
int64_t palf_max_ts_ns = OB_INVALID_TIMESTAMP;
|
||||
@ -804,8 +816,8 @@ int ObApplyStatus::update_last_check_log_ts_ns_()
|
||||
//防御性检查, palf的max_ts不应该回退到已达成一致的max_applied_cb_ts_ns_之前
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(ERROR, "invalid palf_max_ts_ns", K(ret), K(ls_id_), K(palf_max_ts_ns), KPC(this));
|
||||
} else if ((palf_proposal_id != proposal_id_) || (FOLLOWER == palf_role)) {
|
||||
if (palf_proposal_id < proposal_id_) {
|
||||
} else if ((palf_proposal_id != curr_proposal_id) || (FOLLOWER == palf_role)) {
|
||||
if (palf_proposal_id < curr_proposal_id) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(ERROR, "invalid palf_proposal_id", K(ret), K(ls_id_), K(palf_proposal_id), KPC(this));
|
||||
} else {
|
||||
|
@ -171,6 +171,28 @@ public:
|
||||
int stat(LSApplyStat &stat) const;
|
||||
int handle_drop_cb();
|
||||
int diagnose(ApplyDiagnoseInfo &diagnose_info);
|
||||
// offline相关
|
||||
//
|
||||
// The constraint between palf and apply:
|
||||
//
|
||||
// Palf guarantee that switch apply to follower only when there is not
|
||||
// any uncommitted logs in previous LEADER, therefore, apply only update
|
||||
// 'palf_committed_end_lsn_' when 'proposal_id_' is as same as current
|
||||
// proposal_id of palf.
|
||||
//
|
||||
// To increase robustness, apply assums that update 'palf_committed_end_lsn_'
|
||||
// when the role of apply is LEADER execpet above constraints. otherwise,
|
||||
// apply consider it as unexpected error.
|
||||
//
|
||||
// However, in rebuild scenario, apply will be reset to FOLLOWER even if there
|
||||
// are logs to be committed when 'proposal_id_' is as same as current proposal_id
|
||||
// of palf.
|
||||
//
|
||||
// To solve above problem, add an interface which used to reset 'proposal_id_' of
|
||||
// apply.
|
||||
//
|
||||
// NB: this interface only can be used in 'ObLogHandler::offline'.
|
||||
void reset_proposal_id();
|
||||
TO_STRING_KV(K(ls_id_),
|
||||
K(role_),
|
||||
K(proposal_id_),
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "lib/oblog/ob_log.h"
|
||||
#include "logservice/applyservice/ob_log_apply_service.h"
|
||||
#include "logservice/replayservice/ob_log_replay_service.h"
|
||||
#include "logservice/rcservice/ob_role_change_service.h"
|
||||
#include "logservice/logrpc/ob_log_rpc_req.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "logservice/palf/lsn.h"
|
||||
@ -37,6 +38,7 @@ ObLogHandler::ObLogHandler() : self_(),
|
||||
apply_status_(NULL),
|
||||
apply_service_(NULL),
|
||||
replay_service_(NULL),
|
||||
rc_service_(NULL),
|
||||
deps_lock_(),
|
||||
lc_cb_(NULL),
|
||||
rpc_proxy_(NULL),
|
||||
@ -45,6 +47,7 @@ ObLogHandler::ObLogHandler() : self_(),
|
||||
last_check_sync_ts_(OB_INVALID_TIMESTAMP),
|
||||
last_renew_loc_ts_(OB_INVALID_TIMESTAMP),
|
||||
is_in_stop_state_(true),
|
||||
is_offline_(false),
|
||||
is_inited_(false),
|
||||
get_max_decided_log_ts_ns_debug_time_(OB_INVALID_TIMESTAMP)
|
||||
{
|
||||
@ -59,6 +62,7 @@ int ObLogHandler::init(const int64_t id,
|
||||
const common::ObAddr &self,
|
||||
ObLogApplyService *apply_service,
|
||||
ObLogReplayService *replay_service,
|
||||
ObRoleChangeService *rc_service,
|
||||
PalfHandle &palf_handle,
|
||||
PalfEnv *palf_env,
|
||||
PalfLocationCacheCb *lc_cb,
|
||||
@ -86,6 +90,7 @@ int ObLogHandler::init(const int64_t id,
|
||||
get_max_decided_log_ts_ns_debug_time_ = OB_INVALID_TIMESTAMP;
|
||||
apply_service_ = apply_service;
|
||||
replay_service_ = replay_service;
|
||||
rc_service_ = rc_service;
|
||||
apply_status_->inc_ref();
|
||||
id_ = id;
|
||||
self_ = self;
|
||||
@ -95,6 +100,7 @@ int ObLogHandler::init(const int64_t id,
|
||||
lc_cb_ = lc_cb;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
is_in_stop_state_ = false;
|
||||
is_offline_ = false;
|
||||
is_inited_ = true;
|
||||
FLOG_INFO("ObLogHandler init success", K(id), K(palf_handle));
|
||||
}
|
||||
@ -161,6 +167,7 @@ void ObLogHandler::destroy()
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
is_inited_ = false;
|
||||
is_offline_ = false;
|
||||
is_in_stop_state_ = true;
|
||||
common::ObSpinLockGuard deps_guard(deps_lock_);
|
||||
apply_service_->revert_apply_status(apply_status_);
|
||||
@ -170,6 +177,7 @@ void ObLogHandler::destroy()
|
||||
if (true == palf_handle_.is_valid()) {
|
||||
palf_env_->close(palf_handle_);
|
||||
}
|
||||
rc_service_ = NULL;
|
||||
lc_cb_ = NULL;
|
||||
rpc_proxy_ = NULL;
|
||||
palf_env_ = NULL;
|
||||
@ -201,7 +209,7 @@ int ObLogHandler::append(const void *buffer,
|
||||
cb->set_append_start_ts(ObTimeUtility::fast_current_time());
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_in_stop_state_) {
|
||||
} else if (is_in_stop_state_ || is_offline_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (LEADER != ATOMIC_LOAD(&role_)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
@ -1320,6 +1328,52 @@ int ObLogHandler::diagnose(LogHandlerDiagnoseInfo &diagnose_info) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::offline()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (true == is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (OB_FAIL(disable_replay())) {
|
||||
CLOG_LOG(WARN, "disable_replay failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(disable_sync())) {
|
||||
CLOG_LOG(WARN, "disable_sync failed", K(ret), KPC(this));
|
||||
} else {
|
||||
WLockGuard guard(lock_);
|
||||
// NB: make proposal_id_ to be invalid:
|
||||
// 1. avoid append success.
|
||||
// 2. make role change success(role change service require proposal_id of log_handler is not same as palf)
|
||||
// 3. don't make role to follower at here, otherwise, role change thread will execute follower to follower.
|
||||
proposal_id_ = INVALID_PROPOSAL_ID;
|
||||
|
||||
// NB:
|
||||
// 1. After set 'is_offline_' to true, we must prohibit apply log, otherwise,
|
||||
// log handler may be come LEADER after offline, and the proposal id of apply
|
||||
// is -1, update committed end ls of appy will print ERROR logs.
|
||||
//
|
||||
// 2. Must reset proposal_id of apply_status_ before set 'is_offline', otherwise,
|
||||
// concurrent 'switch to follower' event may set apply status to FOLLOWER, however,
|
||||
// there are some uncommitted logs in PALF. and before reset_proposal_id, these
|
||||
// uncommitted logs has been committed. and then update committed end ls of apply
|
||||
// will print ERROR logs, because the role of apply is FOLLOWER, and the proposal_id
|
||||
// of apply is as same as PALF.
|
||||
apply_status_->reset_proposal_id();
|
||||
//
|
||||
// 3. Must keep the order of set 'is_offline_' between reset the proposal id of apply.
|
||||
//
|
||||
MEM_BARRIER();
|
||||
is_offline_ = true;
|
||||
// NB: must ensure on_role_change not fail.
|
||||
if (OB_FAIL(rc_service_->on_role_change(id_))) {
|
||||
CLOG_LOG(ERROR, "on_role_change failed", K(ret), KPC(this));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "LogHandler offline success", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1333,5 +1387,44 @@ int ObLogHandler::diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::online(const LSN &lsn, const int64_t log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_decided_ts_ns = OB_INVALID_TIMESTAMP;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (true == is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (OB_FAIL(get_max_decided_log_ts_ns(max_decided_ts_ns))) {
|
||||
CLOG_LOG(WARN, "get_max_decided_log_ts_ns failed", K(ret), KPC(this));
|
||||
} else if (log_ts < max_decided_ts_ns) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
CLOG_LOG(WARN, "base log ts is less than max decided log ts, not supported",
|
||||
K(ret), KPC(this), K(log_ts), K(max_decided_ts_ns));
|
||||
} else if (OB_FAIL(enable_replay(lsn, log_ts))) {
|
||||
CLOG_LOG(WARN, "enable_replay failed", K(ret), KPC(this), K(lsn), K(log_ts));
|
||||
} else if (OB_FAIL(enable_sync())) {
|
||||
CLOG_LOG(WARN, "enable_sync failed", K(ret), KPC(this));
|
||||
} else {
|
||||
WLockGuard guard(lock_);
|
||||
proposal_id_ = INVALID_PROPOSAL_ID;
|
||||
is_offline_ = false;
|
||||
// NB: before notify role change service, we need set role to FOLLOWER,
|
||||
// otherwise, role change service may need switch leader to leader.
|
||||
role_ = common::FOLLOWER;
|
||||
if (OB_FAIL(rc_service_->on_role_change(id_))) {
|
||||
CLOG_LOG(WARN, "on_role_change failed", K(ret), KPC(this));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "LogHander online success", K(ret), KPC(this), K(lsn), K(log_ts));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObLogHandler::is_offline() const
|
||||
{
|
||||
return true == ATOMIC_LOAD(&is_offline_);
|
||||
}
|
||||
} // end namespace logservice
|
||||
} // end napespace oceanbase
|
||||
|
@ -52,6 +52,7 @@ struct LogHandlerDiagnoseInfo {
|
||||
K(log_handler_proposal_id_));
|
||||
};
|
||||
|
||||
class ObRoleChangeService;
|
||||
class ObILogHandler
|
||||
{
|
||||
public:
|
||||
@ -135,6 +136,9 @@ public:
|
||||
virtual int enable_vote() = 0;
|
||||
virtual int register_rebuild_cb(palf::PalfRebuildCb *rebuild_cb) = 0;
|
||||
virtual int unregister_rebuild_cb() = 0;
|
||||
virtual int offline() = 0;
|
||||
virtual int online(const palf::LSN &lsn, const int64_t log_ts) = 0;
|
||||
virtual bool is_offline() const = 0;
|
||||
};
|
||||
|
||||
class ObLogHandler : public ObILogHandler, public ObLogHandlerBase
|
||||
@ -147,6 +151,7 @@ public:
|
||||
const common::ObAddr &self,
|
||||
ObLogApplyService *apply_service,
|
||||
ObLogReplayService *replay_service,
|
||||
ObRoleChangeService *rc_service,
|
||||
palf::PalfHandle &palf_handle,
|
||||
palf::PalfEnv *palf_env,
|
||||
palf::PalfLocationCacheCb *lc_cb,
|
||||
@ -512,6 +517,9 @@ public:
|
||||
int diagnose(LogHandlerDiagnoseInfo &diagnose_info) const;
|
||||
int diagnose_palf(palf::PalfDiagnoseInfo &diagnose_info) const;
|
||||
TO_STRING_KV(K_(role), K_(proposal_id), KP(palf_env_), K(is_in_stop_state_), K(is_inited_));
|
||||
int offline() override final;
|
||||
int online(const palf::LSN &lsn, const int64_t log_ts) override final;
|
||||
bool is_offline() const override final;
|
||||
private:
|
||||
int submit_config_change_cmd_(const LogConfigChangeCmd &req);
|
||||
int get_leader_max_ts_ns_(int64_t &max_ts_ns) const;
|
||||
@ -522,6 +530,7 @@ private:
|
||||
ObApplyStatus *apply_status_;
|
||||
ObLogApplyService *apply_service_;
|
||||
ObLogReplayService *replay_service_;
|
||||
ObRoleChangeService *rc_service_;
|
||||
ObSpinLock deps_lock_;
|
||||
mutable palf::PalfLocationCacheCb *lc_cb_;
|
||||
mutable obrpc::ObLogServiceRpcProxy *rpc_proxy_;
|
||||
@ -531,6 +540,7 @@ private:
|
||||
mutable int64_t last_check_sync_ts_;
|
||||
mutable int64_t last_renew_loc_ts_;
|
||||
bool is_in_stop_state_;
|
||||
bool is_offline_;
|
||||
bool is_inited_;
|
||||
mutable int64_t get_max_decided_log_ts_ns_debug_time_;
|
||||
};
|
||||
|
@ -385,8 +385,8 @@ int ObLogService::add_ls(const ObLSID &id,
|
||||
} else if (OB_FAIL(replay_service_.add_ls(id,
|
||||
replica_type))) {
|
||||
CLOG_LOG(WARN, "failed to add_ls for replay_service", K(ret), K(id));
|
||||
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
|
||||
palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
|
||||
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
|
||||
&role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
|
||||
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), K(id), KP(palf_env_), K(palf_handle));
|
||||
} else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) {
|
||||
CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_));
|
||||
@ -600,8 +600,8 @@ int ObLogService::create_ls_(const share::ObLSID &id,
|
||||
CLOG_LOG(WARN, "failed to add_ls for apply engine", K(ret), K(id));
|
||||
} else if (OB_FAIL(replay_service_.add_ls(id, replica_type))) {
|
||||
CLOG_LOG(WARN, "failed to add_ls", K(ret), K(id));
|
||||
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
|
||||
palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
|
||||
} else if (OB_FAIL(log_handler.init(id.id(), self_, &apply_service_, &replay_service_,
|
||||
&role_change_service_, palf_handle, palf_env_, loc_cache_cb, &rpc_proxy_))) {
|
||||
CLOG_LOG(WARN, "ObLogHandler init failed", K(ret), KP(palf_env_), K(palf_handle));
|
||||
} else if (OB_FAIL(restore_handler.init(id.id(), palf_env_))) {
|
||||
CLOG_LOG(WARN, "ObLogRestoreHandler init failed", K(ret), K(id), KP(palf_env_));
|
||||
|
@ -261,7 +261,16 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_(
|
||||
ObLS *ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool need_transform_by_access_mode = (curr_access_mode == AccessMode::RAW_WRITE ? false : true);
|
||||
const bool log_handler_is_offline = ls->get_log_handler()->is_offline();
|
||||
|
||||
// If log handler is offline, need execute LEADER_2_FOLLOWER or FOLLOWER_2_FOLLOWER.
|
||||
//
|
||||
// when access mode is RAW_WRITE or FLASHBACK, restore_handler need execute leader to
|
||||
// follower or follower to leader. otherwise, only need execute leader to follower or
|
||||
// follower to follower, therefore, we set 'need_transform_by_access_mode' to false
|
||||
// when 'curr_access_mode' is RAW_WRITE or FLASHBACK
|
||||
const bool only_need_change_to_follower =
|
||||
!is_raw_write_or_flashback_mode(curr_access_mode) || log_handler_is_offline;
|
||||
RoleChangeOptType opt_type;
|
||||
ObRole curr_role = ObRole::INVALID_ROLE;
|
||||
ObRole new_role = ObRole::INVALID_ROLE;
|
||||
@ -274,11 +283,11 @@ int ObRoleChangeService::handle_role_change_cb_event_for_restore_handler_(
|
||||
new_proposal_id, is_pending_state))) {
|
||||
CLOG_LOG(WARN, "ObRestoreHandler prepare_switch_role failed", K(ret), K(curr_role), K(curr_proposal_id),
|
||||
K(new_role), K(new_proposal_id));
|
||||
} else if (true == is_pending_state
|
||||
|| false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) {
|
||||
} else if (false == need_execute_role_change(curr_proposal_id, new_proposal_id,
|
||||
is_pending_state, log_handler_is_offline)) {
|
||||
CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id),
|
||||
K(new_role), K(new_proposal_id));
|
||||
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, need_transform_by_access_mode))) {
|
||||
K(new_role), K(new_proposal_id), K(is_pending_state), K(log_handler_is_offline));
|
||||
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, only_need_change_to_follower))) {
|
||||
} else {
|
||||
switch (opt_type) {
|
||||
// leader -> follower
|
||||
@ -319,7 +328,15 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
|
||||
ObLS *ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool need_transform_by_access_mode = (curr_access_mode == AccessMode::APPEND ? false : true);
|
||||
const bool log_handler_is_offline = ls->get_log_handler()->is_offline();
|
||||
|
||||
// If log handler is offline, need execute LEADER_2_FOLLOWER or FOLLOWER_2_FOLLOWER
|
||||
//
|
||||
// when access mode is APPEND, log_handler need execute leader to follower or
|
||||
// follower to leader. otherwise, only need execute leader to follower or follower
|
||||
// to follower, therefore, we set 'need_transform_by_access_mode' to false when
|
||||
// 'curr_access_mode' is APPEND.
|
||||
const bool only_need_change_to_follower = !is_append_mode(curr_access_mode) || log_handler_is_offline;
|
||||
RoleChangeOptType opt_type;
|
||||
ObRole curr_role = ObRole::INVALID_ROLE;
|
||||
ObRole new_role = ObRole::INVALID_ROLE;
|
||||
@ -333,11 +350,11 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
|
||||
} else if (true == is_pending_state) {
|
||||
CLOG_LOG(INFO, "curr state of palf is follower pending, need ignore this signal", K(curr_role), K(curr_proposal_id),
|
||||
K(new_role), K(new_proposal_id), K(is_pending_state));
|
||||
} else if (true == is_pending_state
|
||||
|| false == check_need_execute_role_change_(curr_proposal_id, curr_role, new_proposal_id, new_role)) {
|
||||
} else if (false == need_execute_role_change(curr_proposal_id, new_proposal_id,
|
||||
is_pending_state, log_handler_is_offline)) {
|
||||
CLOG_LOG(INFO, "no need change role", K(ret), K(is_pending_state), K(curr_role), K(curr_proposal_id),
|
||||
K(new_role), K(new_proposal_id));
|
||||
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, need_transform_by_access_mode))) {
|
||||
K(new_role), K(new_proposal_id), K(is_pending_state), K(log_handler_is_offline));
|
||||
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, only_need_change_to_follower))) {
|
||||
} else {
|
||||
switch (opt_type) {
|
||||
// leader -> follower
|
||||
@ -805,14 +822,16 @@ ObRoleChangeService::RoleChangeOptType ObRoleChangeService::get_role_change_opt_
|
||||
return rc_opt_type;
|
||||
}
|
||||
|
||||
// NB: if there are no pending logs in palf, leader to follower no need wait new leader.
|
||||
// therefore, the proposal id of log handler and palf will be same, but we also need
|
||||
// change leader to follower.
|
||||
bool ObRoleChangeService::check_need_execute_role_change_(
|
||||
const int64_t curr_proposal_id, const common::ObRole &curr_role,
|
||||
const int64_t new_proposal_id, const common::ObRole &new_role) const
|
||||
// NB:
|
||||
// 1. when log handler is offline, need execute role change;
|
||||
// 2. when proposal id are not same and is not in pending, need execute role change.
|
||||
bool ObRoleChangeService::need_execute_role_change(const int64_t curr_proposal_id,
|
||||
const int64_t new_proposal_id,
|
||||
const bool is_pending_state,
|
||||
const bool is_offline) const
|
||||
{
|
||||
return curr_proposal_id != new_proposal_id || curr_role != new_role;
|
||||
return is_offline
|
||||
|| (!is_pending_state && curr_proposal_id != new_proposal_id);
|
||||
}
|
||||
|
||||
int ObRoleChangeService::diagnose(RCDiagnoseInfo &diagnose_info)
|
||||
@ -829,5 +848,16 @@ int ObRoleChangeService::diagnose(RCDiagnoseInfo &diagnose_info)
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObRoleChangeService::is_append_mode(const palf::AccessMode &access_mode) const
|
||||
{
|
||||
return palf::AccessMode::APPEND == access_mode;
|
||||
}
|
||||
|
||||
bool ObRoleChangeService::is_raw_write_or_flashback_mode(const palf::AccessMode &access_mode) const
|
||||
{
|
||||
return palf::AccessMode::RAW_WRITE == access_mode
|
||||
|| palf::AccessMode::FLASHBACK == access_mode;
|
||||
}
|
||||
} // end namespace logservice
|
||||
} // end namespace oceanbase
|
||||
|
@ -112,8 +112,13 @@ private:
|
||||
const int64_t proposal_id,
|
||||
const share::ObLSID &ls_id,
|
||||
palf::LSN &end_lsn);
|
||||
bool check_need_execute_role_change_(const int64_t curr_proposal_id, const common::ObRole &curr_role,
|
||||
const int64_t new_proposal_id, const common::ObRole &new_role) const;
|
||||
bool need_execute_role_change(const int64_t curr_proposal_id,
|
||||
const int64_t new_proposal_id,
|
||||
const bool is_pending_state,
|
||||
const bool is_offline) const;
|
||||
|
||||
bool is_append_mode(const palf::AccessMode &access_mode) const;
|
||||
bool is_raw_write_or_flashback_mode(const palf::AccessMode &access_mode) const;
|
||||
private:
|
||||
enum class RoleChangeOptType {
|
||||
INVALID_RC_OPT_TYPE = 0,
|
||||
|
@ -584,17 +584,6 @@ void ObLS::destroy()
|
||||
tenant_id_ = OB_INVALID_TENANT_ID;
|
||||
}
|
||||
|
||||
int ObLS::offline_log_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(log_handler_.disable_sync())) {
|
||||
LOG_WARN("failed to disable sync", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(log_handler_.disable_replay())) {
|
||||
LOG_WARN("failed to disable replay", K(ret), K(ls_meta_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::offline_tx_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -619,7 +608,7 @@ int ObLS::offline_()
|
||||
LOG_WARN("checkpoint executor offline failed", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(ls_wrs_handler_.offline())) {
|
||||
LOG_WARN("weak read handler offline failed", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(offline_log_())) {
|
||||
} else if (OB_FAIL(log_handler_.offline())) {
|
||||
LOG_WARN("failed to offline log", K(ret));
|
||||
} else if (OB_FAIL(ls_ddl_log_handler_.offline())) {
|
||||
LOG_WARN("ddl log handler offline failed", K(ret), K(ls_meta_));
|
||||
@ -654,18 +643,6 @@ int ObLS::offline()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::online_log_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(log_handler_.enable_replay(ls_meta_.get_clog_base_lsn(),
|
||||
ls_meta_.get_clog_checkpoint_ts()))) {
|
||||
LOG_WARN("failed to enable replay", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(log_handler_.enable_sync())) {
|
||||
LOG_WARN("failed to enable sync", K(ret), K(ls_meta_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::online_tx_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -694,7 +671,8 @@ int ObLS::online()
|
||||
LOG_WARN("ls tx online failed", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(ls_ddl_log_handler_.online())) {
|
||||
LOG_WARN("ddl log handler online failed", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(online_log_())) {
|
||||
} else if (OB_FAIL(log_handler_.online(ls_meta_.get_clog_base_lsn(),
|
||||
ls_meta_.get_clog_checkpoint_ts()))) {
|
||||
LOG_WARN("failed to online log", K(ret));
|
||||
} else if (OB_FAIL(ls_wrs_handler_.online())) {
|
||||
LOG_WARN("weak read handler online failed", K(ret), K(ls_meta_));
|
||||
|
@ -308,8 +308,6 @@ private:
|
||||
int prepare_for_safe_destroy_();
|
||||
int flush_if_need_(const bool need_flush);
|
||||
int offline_();
|
||||
int offline_log_();
|
||||
int online_log_();
|
||||
int offline_tx_();
|
||||
int online_tx_();
|
||||
public:
|
||||
|
@ -341,6 +341,9 @@ public:
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int unregister_rebuild_cb() { return OB_SUCCESS; }
|
||||
bool is_offline() const {return false;};
|
||||
int offline() {return OB_SUCCESS;};
|
||||
int online(const LSN &lsn, const int64_t ts) { UNUSED(lsn); UNUSED(ts); return OB_SUCCESS;};
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
Reference in New Issue
Block a user