[bugfix] fix that replaying log after role switched to leader

This commit is contained in:
obdev 2024-02-07 23:36:10 +00:00 committed by ob-robot
parent d63522ff95
commit cbaed137f1
9 changed files with 178 additions and 24 deletions

View File

@ -48,6 +48,10 @@ public:
}
public:
bool is_idle() const
{
return IDLE == ATOMIC_LOAD(&status_);
}
bool acquire()
{
bool bool_ret = false;

View File

@ -574,6 +574,8 @@ class EventTable
EN_UNDO_ACTIONS_SIZE_OVERFLOW = 276,
EN_PART_PLUS_UNDO_OVERFLOW = 277,
EN_HANDLE_PREPARE_MESSAGE_EAGAIN = 278,
EN_RC_ONLY_LEADER_TO_LEADER = 279,
EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP = 280,
//simulate DAS errors 301-350
EN_DAS_SCAN_RESULT_OVERFLOW = 301,

View File

@ -25,6 +25,7 @@
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx_storage/ob_ls_handle.h"
#include "share/ob_occam_time_guard.h"
#include "observer/ob_server_event_history_table_operator.h"
namespace oceanbase
{
@ -323,7 +324,15 @@ int ObRoleChangeService::handle_role_change_event_(const RoleChangeEvent &event,
switch (event.event_type_) {
case RoleChangeEventType::CHANGE_LEADER_EVENT_TYPE:
CLOG_LOG(INFO, "begin change leader", K(curr_access_mode), K(event), KPC(ls));
if (is_append_mode(curr_access_mode)
#ifdef ERRSIM
ret = OB_E(EventTable::EN_RC_ONLY_LEADER_TO_LEADER) OB_SUCCESS;
if (OB_FAIL(ret)) {
ls->get_log_restore_handler()->change_leader_to(event.dst_addr_);
CLOG_LOG(INFO, "fake EN_RC_ONLY_LEADER_TO_LEADER with change_leader_event", KPC(ls), K(event));
}
#endif
if (OB_FAIL(ret)) {
} else if (is_append_mode(curr_access_mode)
&& OB_FAIL(handle_change_leader_event_for_log_handler_(event.dst_addr_, ls))) {
CLOG_LOG(WARN, "ObLogHandler change leader failed", K(ret), K(event), KPC(ls));
} else if (is_raw_write_or_flashback_mode(curr_access_mode)
@ -456,39 +465,51 @@ int ObRoleChangeService::handle_role_change_cb_event_for_log_handler_(
K(new_role), K(new_proposal_id), K(is_pending_state), K(log_handler_is_offline));
} else if (FALSE_IT(opt_type = get_role_change_opt_type_(curr_role, new_role, only_need_change_to_follower))) {
} else {
#ifdef ERRSIM
ret = OB_E(EventTable::EN_RC_ONLY_LEADER_TO_LEADER) OB_SUCCESS;
if (RoleChangeOptType::LEADER_2_LEADER == opt_type) {
ret = OB_SUCCESS;
}
if (OB_FAIL(ret)) {
CLOG_LOG(INFO, "fake EN_RC_ONLY_LEADER_TO_LEADER with role_change_event", KPC(ls), K(opt_type));
}
#endif
switch (opt_type) {
// leader -> follower
case RoleChangeOptType::LEADER_2_FOLLOWER:
if (OB_FAIL(switch_leader_to_follower_forcedly_(new_proposal_id, ls))) {
if (OB_SUCC(ret) && OB_FAIL(switch_leader_to_follower_forcedly_(new_proposal_id, ls))) {
CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(curr_role),
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
}
break;
// follower -> follower
// follower -> follower
case RoleChangeOptType::FOLLOWER_2_LEADER:
if (OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), K(curr_role),
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
if (OB_SUCC(ret) && OB_FAIL(switch_follower_to_leader_(new_proposal_id, ls, retry_ctx))) {
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(curr_role), K(curr_proposal_id), K(new_role),
K(curr_access_mode), K(new_proposal_id));
}
break;
// leader -> leader
// leader -> leader
case RoleChangeOptType::LEADER_2_LEADER:
if (OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id, ls, retry_ctx))) {
if (OB_SUCC(ret) && OB_FAIL(switch_leader_to_leader_(new_proposal_id, curr_proposal_id,
ls, retry_ctx))) {
CLOG_LOG(WARN, "switch_leader_to_leader_ failed", K(ret), K(curr_role),
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
}
break;
// follower -> follower
case RoleChangeOptType::FOLLOWER_2_FOLLOWER:
if (OB_FAIL(switch_follower_to_follower_(new_proposal_id, ls))) {
CLOG_LOG(WARN, "switch_follower_to_follower_ failed", K(ret), K(curr_role),
K(curr_proposal_id), K(new_role), K(curr_access_mode), K(new_proposal_id));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected error, can not handle role change", K(ret), K(curr_role),
K(curr_proposal_id), K(new_role), K(new_proposal_id), KPC(ls));
// follower -> follower
case RoleChangeOptType::FOLLOWER_2_FOLLOWER:
if (OB_SUCC(ret) && OB_FAIL(switch_follower_to_follower_(new_proposal_id, ls))) {
CLOG_LOG(WARN, "switch_follower_to_follower_ failed", K(curr_role), K(curr_proposal_id),
K(new_role), K(curr_access_mode), K(new_proposal_id));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected error, can not handle role change", K(ret),
K(curr_role), K(curr_proposal_id), K(new_role), K(new_proposal_id), KPC(ls));
}
}
return ret;
@ -585,6 +606,10 @@ int ObRoleChangeService::switch_follower_to_leader_(
CLOG_LOG(WARN, "apply_service_ switch_to_leader failed", K(ret), K(new_role), K(new_proposal_id));
} else if (FALSE_IT(time_guard.click("replay_service->switch_to_leader"))
|| OB_FAIL(replay_service_->switch_to_leader(ls_id))) {
CLOG_LOG(WARN, "replay_service_ switch_to_leader failed", K(new_role), K(new_proposal_id));
} else if (FALSE_IT(time_guard.click("wait_replay_service_submit_task_clear_"))
|| OB_FAIL(wait_replay_service_submit_task_clear_(ls_id))) {
CLOG_LOG(ERROR, "wait_replay_service_submit_task_clear_ failed", K(new_role), K(new_proposal_id));
} else if (FALSE_IT(log_handler->switch_role(new_role, new_proposal_id))) {
CLOG_LOG(WARN, "ObLogHandler switch role failed", K(ret), K(new_role), K(new_proposal_id));
} else if (FALSE_IT(ATOMIC_SET(&cur_task_info_.state_, TakeOverState::WAIT_RC_HANDLER_DONE))) {
@ -712,6 +737,9 @@ int ObRoleChangeService::switch_leader_to_leader_(
RetrySubmitRoleChangeEventCtx &retry_ctx)
{
int ret = OB_SUCCESS;
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("LOGSERVICE", "BEFORE_LEADER_TO_LEADER_RC");
#endif
ObTimeGuard time_guard("switch_leader_to_leader", EACH_ROLE_CHANGE_COST_MAX_TIME);
if (FALSE_IT(time_guard.click("switch_leader_to_follower_forcedly_"))
|| OB_FAIL(switch_leader_to_follower_forcedly_(curr_proposal_id, ls))) {
@ -846,6 +874,25 @@ int ObRoleChangeService::wait_replay_service_replay_done_(
return ret;
}
int ObRoleChangeService::wait_replay_service_submit_task_clear_(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
bool is_clear = false;
const int64_t start_ts = ObTimeUtility::current_time();
while (OB_SUCC(ret) && (!is_clear)) {
if (OB_FAIL(replay_service_->is_submit_task_clear(ls_id, is_clear))) {
CLOG_LOG(WARN, "replay_service_ is_submit_task_clean failed", K(is_clear));
} else if (!is_clear) {
ob_usleep(1 * 1000);
if (REACH_TIME_INTERVAL(100 * 1000L)) {
CLOG_LOG(WARN, "submit_task is not clear, need retry", K(ls_id), K(start_ts));
}
} else {/*do nothing*/}
}
CLOG_LOG(INFO, "wait_replay_service_submit_task_clear_ finish", K(ls_id), K(is_clear));
return ret;
}
int ObRoleChangeService::wait_apply_service_apply_done_(
const share::ObLSID &ls_id,
palf::LSN &end_lsn)

View File

@ -165,6 +165,8 @@ private:
int wait_replay_service_replay_done_(const share::ObLSID &ls_id,
const palf::LSN &end_lsn,
const int64_t timeout_us);
int wait_replay_service_submit_task_clear_(const share::ObLSID &ls_id);
int wait_apply_service_apply_done_(const share::ObLSID &ls_id,
palf::LSN &end_lsn);
int wait_apply_service_apply_done_when_change_leader_(const ObLogHandler *log_handler,

View File

@ -21,6 +21,7 @@
#include "share/ob_thread_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
#include "observer/ob_server_event_history_table_operator.h"
namespace oceanbase
{
@ -577,6 +578,27 @@ int ObLogReplayService::is_replay_done(const share::ObLSID &id,
return ret;
}
int ObLogReplayService::is_submit_task_clear(const share::ObLSID &id, bool &is_clear)
{
int ret = OB_SUCCESS;
ObReplayStatus *replay_status = NULL;
ObReplayStatusGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "replay service not init", K(ret));
} else if (OB_FAIL(get_replay_status_(id, guard))) {
CLOG_LOG(WARN, "guard get replay status failed", K(id));
} else if (NULL == (replay_status = guard.get_replay_status())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "replay status is not exist", K(id));
} else if (OB_FAIL(replay_status->is_submit_task_clear(is_clear))){
CLOG_LOG(WARN, "check replay done failed", K(id));
} else {
// do nothing
}
return ret;
}
//通用接口, 受控回放时最终返回值为受控回放点前的最后一条日志的log_ts
int ObLogReplayService::get_max_replayed_scn(const share::ObLSID &id, SCN &scn)
{
@ -850,6 +872,10 @@ int ObLogReplayService::do_replay_task_(ObLogReplayTask *replay_task,
CLOG_LOG(ERROR, "invalid argument", KPC(replay_task), KPC(replay_status), KR(ret));
} else if (OB_ISNULL(ls_adapter_)) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(replay_status->check_can_replay())) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
CLOG_LOG(INFO, "can not replay log", KPC(replay_status), KPC(replay_task));
}
} else if (OB_FAIL(replay_status->check_replay_barrier(replay_task, replay_log_buff,
need_replay, replay_queue_idx))) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
@ -1117,6 +1143,13 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
if (!replay_status->is_enabled_without_lock() || !replay_status->need_submit_log()) {
need_submit_log = false;
} else {
#ifdef ERRSIM
int tmp_ret = OB_E(EventTable::EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP) OB_SUCCESS;
if (OB_SUCCESS != tmp_ret) {
usleep(300 * 1000);
CLOG_LOG(INFO, "sleep 300ms before read log", KPC(submit_task), KPC(replay_status), KR(tmp_ret));
}
#endif
const SCN &replayable_point = inner_get_replayable_point_();
need_submit_log = submit_task->has_remained_submit_log(replayable_point,
iterate_end_by_replayable_point);

View File

@ -57,6 +57,7 @@ class ObILogReplayService
{
public:
virtual int is_replay_done(const share::ObLSID &id, const palf::LSN &end_lsn, bool &is_done) = 0;
virtual int is_submit_task_clear(const share::ObLSID &id, bool &is_clear) = 0;
virtual int switch_to_follower(const share::ObLSID &id, const palf::LSN &begin_lsn) = 0;
virtual int switch_to_leader(const share::ObLSID &id) = 0;
};
@ -147,6 +148,7 @@ public:
int is_replay_done(const share::ObLSID &id,
const palf::LSN &end_lsn,
bool &is_done);
int is_submit_task_clear(const share::ObLSID &id, bool &is_clear);
int get_max_replayed_scn(const share::ObLSID &id, share::SCN &scn);
int submit_task(ObReplayServiceTask *task);
int update_replayable_point(const share::SCN &replayable_scn);

View File

@ -282,6 +282,20 @@ int ObReplayServiceSubmitTask::next_log(const SCN &replayable_point,
int ObReplayServiceSubmitTask::reset_iterator(PalfHandle &palf_handle, const LSN &begin_lsn)
{
int ret = OB_SUCCESS;
/*
* next_to_submit_lsn_ may be bigger than begin_lsn;
current replica is A
first revoke ObRoleChangeService::switch_follower_to_leader_():
T1 A invoke log_handler->get_end_lsn(end_lsn) and end_lsn is 100.
T2 another replica B switches to leader and submits log with lsn[101, 200].
T3 A invoke replay_service_->switch_to_leader(ls_id)
T4 A failed with invoking role_change_handler->switch_to_leader(cur_task_info_).
replica A replayed log [101, 150) between T1 and T3.
then
T5 replica A invoke ObRoleChangeService::leader_to_follower_forcedly_()
within leader_to_follower_forcedly_(), parameter end_lsn passed to
replay_service_->switch_to_follower(ls_id, end_lsn) is 100 while next_to_submit_lsn_ is 150.
*/
next_to_submit_lsn_ = std::max(next_to_submit_lsn_, begin_lsn);
if (OB_FAIL(palf_handle.seek(next_to_submit_lsn_, iterator_))) {
ret = OB_ERR_UNEXPECTED;
@ -815,6 +829,15 @@ void ObReplayStatus::switch_to_follower(const palf::LSN &begin_lsn)
role_ = FOLLOWER;
} while (0);
#ifdef ERRSIM
int tmp_ret = OB_E(EventTable::EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP) OB_SUCCESS;
if (OB_SUCCESS != tmp_ret) {
CLOG_LOG(INFO, "fake EN_REPLAY_SERVICE_SUBMIT_TASK_SLEEP ", KPC(this), K(begin_lsn));
SERVER_EVENT_SYNC_ADD("REPLAYSERVICE", "BEFORE_PUSH_SUBMIT_TASK");
}
DEBUG_SYNC(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK);
#endif
RLockGuard rguard(rwlock_);
if (!is_enabled_) {
// do nothing
@ -871,6 +894,20 @@ int ObReplayStatus::is_replay_done(const LSN &end_lsn,
return ret;
}
int ObReplayStatus::is_submit_task_clear(bool &is_clear) const
{
int ret = OB_SUCCESS;
is_clear = false;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "replay status has not been inited", K(ret));
} else {
RLockGuard rlock_guard(rwlock_);
is_clear = submit_log_task_.is_idle();
}
return ret;
}
int ObReplayStatus::update_end_offset(const LSN &lsn)
{
int ret = OB_SUCCESS;
@ -1376,7 +1413,7 @@ int ObReplayStatus::trigger_fetch_log()
{
int ret = OB_SUCCESS;
RLockGuard rlock_guard(rwlock_);
if (is_enabled_) {
if (is_enabled_ && need_submit_log()) {
if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_),
KPC(this), K(ret));
@ -1386,5 +1423,18 @@ int ObReplayStatus::trigger_fetch_log()
}
return ret;
}
int ObReplayStatus::check_can_replay() const
{
int ret = OB_SUCCESS;
{
RLockGuard guard(rolelock_);
if (FOLLOWER != role_) {
ret = OB_EAGAIN;
}
}
return ret;
}
} // namespace logservice
}

View File

@ -205,6 +205,11 @@ public:
return lease_.revoke();
}
bool is_idle() const
{
return lease_.is_idle();
}
ObReplayServiceTaskType get_type() const
{
return type_;
@ -477,8 +482,13 @@ public:
//
// @return : OB_SUCCESS : success
// OB_NOT_INIT: ObReplayStatus has not been inited
int is_replay_done(const palf::LSN &lsn,
bool &is_done);
int is_replay_done(const palf::LSN &lsn, bool &is_done);
//check whether submit task is in the global queue of ObLogReplayService
//@param [out] : true if submit task is not in the global queue
//@return : OB_SUCCESS : success
//OB_NOT_INIT: ObReplayStatus has not been inited
int is_submit_task_clear(bool &is_clear) const;
// 存在待回放的已提交日志任务
bool has_remained_replay_task() const;
// update right margin of logs that need to replay
@ -513,6 +523,8 @@ public:
ObLogReplayBuffer *&replay_log_buf,
bool &need_replay,
const int64_t replay_queue_idx);
//check whether can replay log so far
int check_can_replay() const;
void set_post_barrier_submitted(const palf::LSN &lsn);
int set_post_barrier_finished(const palf::LSN &lsn);
int trigger_fetch_log();
@ -571,6 +583,7 @@ private:
// 注销回调并清空任务
int disable_();
bool is_replay_enabled_() const;
private:
static const int64_t PENDING_COUNT_THRESHOLD = 100;
static const int64_t EAGAIN_COUNT_THRESHOLD = 50000;

View File

@ -570,6 +570,7 @@ class ObString;
ACT(BEFORE_WAIT_SYS_LS_END_SCN,)\
ACT(BEFORE_CREATE_CLONE_TENANT_END,)\
ACT(BEFORE_CALC_CONSISTENT_SCN,)\
ACT(REPLAY_SWITCH_TO_FOLLOWER_BEFORE_PUSH_SUBMIT_TASK,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);