Support force set ls as single member for ob_admin.
This commit is contained in:
parent
f5ae210882
commit
0bce93fe67
@ -911,6 +911,7 @@ PCODE_DEF(OB_LOG_FLASHBACK_CMD, 0x151C)
|
||||
PCODE_DEF(OB_LOG_GET_PALF_STAT, 0x151D)
|
||||
PCODE_DEF(OB_LOG_NOTIFY_FETCH_LOG, 0x151E)
|
||||
PCODE_DEF(OB_LOG_GET_STAT, 0x151F)
|
||||
PCODE_DEF(OB_LOG_FORCE_SET_LS_AS_SINGLE_REPLICA, 0x1520)
|
||||
|
||||
// 1531-1550 for obesi
|
||||
// PCODE_DEF(OB_ESI_IS_EXIST, 0x1531)
|
||||
|
@ -254,6 +254,9 @@ int ConfigChangeCmdHandler::handle_config_change_cmd(const LogConfigChangeCmd &r
|
||||
CLOG_LOG(ERROR, "get_reporter failed", K(req.palf_id_));
|
||||
} else {
|
||||
switch (req.cmd_type_) {
|
||||
case FORCE_SINGLE_MEMBER_CMD:
|
||||
ret = palf_handle_->force_set_as_single_replica();
|
||||
break;
|
||||
case CHANGE_REPLICA_NUM_CMD:
|
||||
ret = palf_handle_->change_replica_num(req.curr_member_list_, req.curr_replica_num_,
|
||||
req.new_replica_num_, req.timeout_us_);
|
||||
|
@ -35,6 +35,7 @@ enum LogConfigChangeCmdType {
|
||||
REMOVE_LEARNER_CMD,
|
||||
SWITCH_TO_ACCEPTOR_CMD,
|
||||
SWITCH_TO_LEARNER_CMD,
|
||||
FORCE_SINGLE_MEMBER_CMD,
|
||||
};
|
||||
|
||||
inline const char *log_config_change_cmd2str(const LogConfigChangeCmdType state)
|
||||
|
@ -598,6 +598,33 @@ int ObLogHandler::change_replica_num(const common::ObMemberList &member_list,
|
||||
return ret;
|
||||
}
|
||||
|
||||
// @desc: force_set_as_single_replica interface
|
||||
// | 1.force_set_as_single_replica()
|
||||
// V
|
||||
// [any_member] ----- 2.one_stage_config_change_(FORCE_SINGLE_MEMBER)
|
||||
int ObLogHandler::force_set_as_single_replica()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObSpinLockGuard deps_guard(deps_lock_);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else {
|
||||
common::ObMember dummy_member;
|
||||
common::ObMemberList dummy_member_list;
|
||||
int64_t dummy_replica_num = -1, new_replica_num = 1;
|
||||
const int64_t timeout_us = 10 * 1000 * 1000L;
|
||||
LogConfigChangeCmd req(self_, id_, dummy_member_list, dummy_replica_num, new_replica_num,
|
||||
FORCE_SINGLE_MEMBER_CMD, timeout_us);
|
||||
ConfigChangeCmdHandler cmd_handler(&palf_handle_);
|
||||
if (OB_FAIL(cmd_handler.handle_config_change_cmd(req))) {
|
||||
CLOG_LOG(WARN, "handle_config_change_cmd failed", KR(ret), K_(id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// @desc: add_member interface
|
||||
// | 1.add_member()
|
||||
// V
|
||||
|
@ -101,6 +101,7 @@ public:
|
||||
const int64_t curr_replica_num,
|
||||
const int64_t new_replica_num,
|
||||
const int64_t timeout_us) = 0;
|
||||
virtual int force_set_as_single_replica() = 0;
|
||||
virtual int add_member(const common::ObMember &member,
|
||||
const int64_t paxos_replica_num,
|
||||
const int64_t timeout_us) = 0;
|
||||
@ -341,6 +342,12 @@ public:
|
||||
const int64_t curr_replica_num,
|
||||
const int64_t new_replica_num,
|
||||
const int64_t timeout_us) override final;
|
||||
// @brief: force set self as single replica.
|
||||
// @return
|
||||
// - OB_SUCCESS: change_replica_num successfully
|
||||
// - OB_TIMEOUT: change_replica_num timeout
|
||||
// - other: bug
|
||||
virtual int force_set_as_single_replica() override final;
|
||||
// @brief, add a member to paxos group, can be called in any member
|
||||
// @param[in] common::ObMember &member: member which will be added
|
||||
// @param[in] const int64_t paxos_replica_num: replica number of paxos group after adding 'member'
|
||||
|
@ -647,11 +647,21 @@ int LogConfigMgr::is_state_changed_(bool &need_rlock, bool &need_wlock) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogConfigMgr::check_config_version_matches_state_(const LogConfigVersion &config_version) const
|
||||
int LogConfigMgr::check_config_version_matches_state_(const LogConfigChangeType &type,
|
||||
const LogConfigVersion &config_version) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (ConfigChangeState::INIT == state_) {
|
||||
ret = (config_version.is_valid())? OB_ERR_UNEXPECTED: OB_SUCCESS;
|
||||
if (config_version.is_valid()) {
|
||||
if (FORCE_SINGLE_MEMBER != type) {
|
||||
// For force set single member case, this may occur. After updating election's member_list,
|
||||
// Self may be elected and finish reconfirm quickly, which will reset config_mgr state and
|
||||
// write start_working log successfully.
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
PALF_LOG(INFO, "Another config change(maybe self reconfirm) has finished during force set single member", K_(palf_id), K_(self), K_(state), K(type), K(config_version), K_(log_ms_meta));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ret = (config_version != log_ms_meta_.curr_.config_version_)? OB_EAGAIN: OB_SUCCESS;
|
||||
}
|
||||
@ -714,7 +724,7 @@ int LogConfigMgr::change_config(const LogConfigChangeArgs &args,
|
||||
} else {
|
||||
ret = change_config_(args, proposal_id, election_epoch, config_version);
|
||||
PALF_LOG(INFO, "config_change stat", K_(palf_id), K_(self), K(args), K(proposal_id),
|
||||
K_(prev_log_proposal_id), K_(prev_lsn), K_(prev_mode_pid), K_(state),
|
||||
K_(prev_log_proposal_id), K_(prev_lsn), K_(prev_mode_pid), K_(state), K(config_version),
|
||||
K_(persistent_config_version), K_(ms_ack_list), K_(resend_config_version),
|
||||
K_(resend_log_list), K_(log_ms_meta), K_(last_submit_config_log_time_us));
|
||||
}
|
||||
@ -737,7 +747,8 @@ int LogConfigMgr::change_config_(const LogConfigChangeArgs &args,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// args may be invalid when background retry config_change, so don't check it here
|
||||
if (false == is_leader_for_config_change_(args.type_, proposal_id, election_epoch)) {
|
||||
if (need_exec_on_leader_(args.type_)
|
||||
&& false == is_leader_for_config_change_(args.type_, proposal_id, election_epoch)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
PALF_LOG(WARN, "not leader, can't change member", KR(ret), K_(palf_id), K_(self),
|
||||
"role", state_mgr_->get_role(), "state", state_mgr_->get_state());
|
||||
@ -746,7 +757,7 @@ int LogConfigMgr::change_config_(const LogConfigChangeArgs &args,
|
||||
ret = OB_EAGAIN;
|
||||
PALF_LOG(WARN, "is changing access_mode, try again", KR(ret), K_(palf_id), K_(self),
|
||||
"role", state_mgr_->get_role(), "state", state_mgr_->get_state());
|
||||
} else if (OB_FAIL(check_config_version_matches_state_(config_version))) {
|
||||
} else if (OB_FAIL(check_config_version_matches_state_(args.type_, config_version))) {
|
||||
PALF_LOG(WARN, "config_version does not match with state, try again", KR(ret), K_(palf_id), K_(self),
|
||||
K(config_version), K_(state), K_(log_ms_meta));
|
||||
} else {
|
||||
@ -1130,6 +1141,10 @@ int LogConfigMgr::check_config_change_args_(const LogConfigChangeArgs &args, boo
|
||||
{
|
||||
break;
|
||||
}
|
||||
case FORCE_SINGLE_MEMBER:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
@ -1200,7 +1215,8 @@ int LogConfigMgr::check_args_and_generate_config(const LogConfigChangeArgs &args
|
||||
SpinLockGuard guard(lock_);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (false == is_leader_for_config_change_(args.type_, proposal_id, election_epoch)) {
|
||||
} else if (need_exec_on_leader_(args.type_)
|
||||
&& false == is_leader_for_config_change_(args.type_, proposal_id, election_epoch)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
PALF_LOG(WARN, "is_leader_for_config_change_ return false", K(ret), K_(palf_id), K_(self),
|
||||
K(args.type_), K(proposal_id), K(election_epoch));
|
||||
@ -1301,10 +1317,17 @@ int LogConfigMgr::append_config_meta_(const int64_t curr_proposal_id,
|
||||
if (INVALID_PROPOSAL_ID == curr_proposal_id || !args.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid argument", KR(ret), K_(palf_id), K_(self), K(args), K(curr_proposal_id));
|
||||
} else if (!(state_mgr_->get_leader() == self_ && state_mgr_->get_proposal_id() == curr_proposal_id)) {
|
||||
} else if (need_exec_on_leader_(args.type_) &&
|
||||
!(state_mgr_->get_leader() == self_ && state_mgr_->get_proposal_id() == curr_proposal_id)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
PALF_LOG(WARN, "leader has switched during config changing", KR(ret), K_(palf_id), K_(self),
|
||||
"role", state_mgr_->get_role(), K(curr_proposal_id), "proposal_id", state_mgr_->get_proposal_id());
|
||||
"role", state_mgr_->get_role(), K(curr_proposal_id), "proposal_id", state_mgr_->get_proposal_id(),
|
||||
"leader", state_mgr_->get_leader());
|
||||
} else if (false == need_exec_on_leader_(args.type_) && !(state_mgr_->get_proposal_id() == curr_proposal_id)) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
PALF_LOG(WARN, "proposal_id has switched during config changing", KR(ret), K_(palf_id), K_(self),
|
||||
"role", state_mgr_->get_role(), K(curr_proposal_id), "proposal_id", state_mgr_->get_proposal_id(),
|
||||
"leader", state_mgr_->get_leader());
|
||||
} else if (OB_FAIL(check_config_change_args_(args, is_already_finished))) {
|
||||
PALF_LOG(WARN, "check_config_change_args_ failed", K(ret), K_(palf_id), K_(self), K_(log_ms_meta), K(args));
|
||||
} else if (is_already_finished) {
|
||||
@ -1516,6 +1539,14 @@ int LogConfigMgr::generate_new_config_info_(const int64_t proposal_id,
|
||||
new_config_info.arbitration_member_.reset();
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && FORCE_SINGLE_MEMBER == cc_type) {
|
||||
// force set single member
|
||||
new_config_info.log_sync_memberlist_.reset();
|
||||
new_config_info.degraded_learnerlist_.reset();
|
||||
new_config_info.arbitration_member_.reset();
|
||||
new_config_info.log_sync_memberlist_.add_member(member);
|
||||
new_config_info.log_sync_replica_num_ = new_log_sync_replica_num;
|
||||
}
|
||||
// learnerlist add
|
||||
if (OB_SUCC(ret) && is_add_learner_list(cc_type)) {
|
||||
if (DEGRADE_ACCEPTOR_TO_LEARNER == cc_type) {
|
||||
|
@ -67,6 +67,7 @@ enum LogConfigChangeType
|
||||
DEGRADE_ACCEPTOR_TO_LEARNER,
|
||||
UPGRADE_LEARNER_TO_ACCEPTOR,
|
||||
STARTWORKING,
|
||||
FORCE_SINGLE_MEMBER,
|
||||
};
|
||||
|
||||
typedef common::ObArrayHashMap<common::ObAddr, common::ObRegion> LogMemberRegionMap;
|
||||
@ -115,12 +116,17 @@ inline bool is_upgrade_or_degrade(const LogConfigChangeType type)
|
||||
|
||||
inline bool is_use_replica_num_args(const LogConfigChangeType type)
|
||||
{
|
||||
return ADD_MEMBER == type || REMOVE_MEMBER == type || CHANGE_REPLICA_NUM == type;
|
||||
return ADD_MEMBER == type || REMOVE_MEMBER == type || CHANGE_REPLICA_NUM == type || FORCE_SINGLE_MEMBER == type;
|
||||
}
|
||||
|
||||
inline bool need_exec_on_leader_(const LogConfigChangeType type)
|
||||
{
|
||||
return (FORCE_SINGLE_MEMBER != type);
|
||||
}
|
||||
|
||||
inline bool is_may_change_replica_num(const LogConfigChangeType type)
|
||||
{
|
||||
return is_add_member_list(type) || is_remove_member_list(type) || CHANGE_REPLICA_NUM == type;
|
||||
return is_add_member_list(type) || is_remove_member_list(type) || CHANGE_REPLICA_NUM == type || FORCE_SINGLE_MEMBER == type;
|
||||
}
|
||||
|
||||
struct LogConfigChangeArgs
|
||||
@ -368,7 +374,7 @@ private:
|
||||
LogConfigVersion &init_config_version);
|
||||
bool can_memberlist_majority_(const int64_t new_member_list_len, const int64_t new_replica_num) const;
|
||||
int check_config_change_args_(const LogConfigChangeArgs &args, bool &is_already_finished) const;
|
||||
int check_config_version_matches_state_(const LogConfigVersion &config_version) const;
|
||||
int check_config_version_matches_state_(const LogConfigChangeType &type, const LogConfigVersion &config_version) const;
|
||||
int generate_new_config_info_(const int64_t proposal_id,
|
||||
const LogConfigChangeArgs &args,
|
||||
LogConfigInfo &new_config_info) const;
|
||||
|
@ -308,6 +308,11 @@ int PalfHandle::change_replica_num(const common::ObMemberList &member_list,
|
||||
CHECK_VALID;
|
||||
return palf_handle_impl_->change_replica_num(member_list, curr_replica_num, new_replica_num, timeout_us);
|
||||
}
|
||||
int PalfHandle::force_set_as_single_replica()
|
||||
{
|
||||
CHECK_VALID;
|
||||
return palf_handle_impl_->force_set_as_single_replica();
|
||||
}
|
||||
int PalfHandle::get_ack_info_array(LogMemberAckInfoList &ack_info_array,
|
||||
common::GlobalLearnerList °raded_list) const
|
||||
{
|
||||
|
@ -198,6 +198,8 @@ public:
|
||||
const int64_t curr_replica_num,
|
||||
const int64_t new_replica_num,
|
||||
const int64_t timeout_us);
|
||||
// @brief: force set self as single member
|
||||
int force_set_as_single_replica();
|
||||
|
||||
int get_ack_info_array(LogMemberAckInfoList &ack_info_array,
|
||||
common::GlobalLearnerList °raded_list) const;
|
||||
|
@ -555,6 +555,27 @@ int PalfHandleImpl::config_change_pre_check(const ObAddr &server,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::force_set_as_single_replica()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
PALF_LOG(WARN, "PalfHandleImpl not init", KR(ret), KPC(this));
|
||||
} else {
|
||||
const int64_t now_time_us = common::ObTimeUtility::current_time();
|
||||
const ObMember member(self_, now_time_us);
|
||||
const int64_t new_replica_num = 1;
|
||||
LogConfigChangeArgs args(member, new_replica_num, FORCE_SINGLE_MEMBER);
|
||||
const int64_t timeout_us = 10 * 1000 * 1000L;
|
||||
if (OB_FAIL(one_stage_config_change_(args, timeout_us))) {
|
||||
PALF_LOG(WARN, "one_stage_config_change_ failed", KR(ret), KPC(this), K(member), K(new_replica_num));
|
||||
} else {
|
||||
PALF_EVENT("force_set_as_single_replica success", palf_id_, KR(ret), KPC(this), K(member), K(new_replica_num));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::change_replica_num(
|
||||
const common::ObMemberList &member_list,
|
||||
const int64_t curr_replica_num,
|
||||
|
@ -277,6 +277,8 @@ public:
|
||||
const int64_t curr_replica_num,
|
||||
const int64_t new_replica_num,
|
||||
const int64_t timeout_us) = 0;
|
||||
// @brief: force set self as single replica.
|
||||
virtual int force_set_as_single_replica() = 0;
|
||||
|
||||
// @brief, add a member into paxos group
|
||||
// @param[in] common::ObMember &member: member which will be added
|
||||
@ -663,6 +665,7 @@ public:
|
||||
int get_global_learner_list(common::GlobalLearnerList &learner_list) const override final;
|
||||
int get_paxos_member_list(common::ObMemberList &member_list, int64_t &paxos_replica_num) const override final;
|
||||
int get_election_leader(common::ObAddr &addr) const;
|
||||
int force_set_as_single_replica() override final;
|
||||
int change_replica_num(const common::ObMemberList &member_list,
|
||||
const int64_t curr_replica_num,
|
||||
const int64_t new_replica_num,
|
||||
|
@ -2206,6 +2206,18 @@ int ObRpcGetLSSyncScnP::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObForceSetLSAsSingleReplicaP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(gctx_.ob_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(WARN, "ob_service is null", KR(ret));
|
||||
} else if (OB_FAIL(gctx_.ob_service_->force_set_ls_as_single_replica(arg_))) {
|
||||
COMMON_LOG(WARN, "force_set_ls_as_single_replica failed", KR(ret), K(arg_));
|
||||
} else {}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRefreshTenantInfoP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -211,6 +211,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_BACKUP_DEST_CONNECTIVITY, ObRpcCheckBackupDe
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_GET_LS_ACCESS_MODE, ObRpcGetLSAccessModeP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CHANGE_LS_ACCESS_MODE, ObRpcChangeLSAccessModeP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_GET_LS_SYNC_SCN, ObRpcGetLSSyncScnP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_LOG_FORCE_SET_LS_AS_SINGLE_REPLICA, ObForceSetLSAsSingleReplicaP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_ESTIMATE_TABLET_BLOCK_COUNT, ObEstimateTabletBlockCountP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_DDL_CHECK_TABLET_MERGE_STATUS, ObRpcDDLCheckTabletMergeStatusP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_REFRESH_TENANT_INFO, ObRefreshTenantInfoP);
|
||||
|
@ -2581,6 +2581,51 @@ int ObService::get_ls_sync_scn(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::force_set_ls_as_single_replica(
|
||||
const ObForceSetLSAsSingleReplicaArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
ObLSService *ls_svr = nullptr;
|
||||
LOG_INFO("force_set_ls_as_single_replica", K(arg));
|
||||
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret));
|
||||
} else if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("arg is invaild", KR(ret), K(arg));
|
||||
} else if (arg.get_tenant_id() != MTL_ID() && OB_FAIL(guard.switch_to(arg.get_tenant_id()))) {
|
||||
LOG_WARN("switch tenant failed", KR(ret), K(arg));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ls_svr = MTL(ObLSService*);
|
||||
logservice::ObLogService *log_ls_svr = MTL(logservice::ObLogService*);
|
||||
ObLS *ls = nullptr;
|
||||
ObLSHandle handle;
|
||||
logservice::ObLogHandler *log_handler = NULL;
|
||||
logservice::ObLogRestoreHandler *restore_handler = NULL;
|
||||
ObLSID ls_id = arg.get_ls_id();
|
||||
if (OB_ISNULL(ls_svr) || OB_ISNULL(log_ls_svr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "should not be null", KR(ret), KP(ls_svr), KP(log_ls_svr));
|
||||
} else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||
COMMON_LOG(WARN, "get ls failed", KR(ret), K(ls_id));
|
||||
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "ls should not be null", KR(ret), K(ls_id));
|
||||
} else if (OB_ISNULL(log_handler = ls->get_log_handler())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("log_handler is null", KR(ret), K(ls_id), KP(ls));
|
||||
} else if (OB_FAIL(log_handler->force_set_as_single_replica())) {
|
||||
LOG_WARN("failed to force_set_as_single_replica", KR(ret), K(ls_id), KPC(ls));
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish force_set_ls_as_single_replica", KR(ret), K(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::refresh_tenant_info(
|
||||
const ObRefreshTenantInfoArg &arg,
|
||||
ObRefreshTenantInfoRes &result)
|
||||
|
@ -146,6 +146,7 @@ public:
|
||||
int delete_backup_ls_task(const obrpc::ObLSBackupCleanArg &arg);
|
||||
int get_ls_sync_scn(const obrpc::ObGetLSSyncScnArg &arg,
|
||||
obrpc::ObGetLSSyncScnRes &result);
|
||||
int force_set_ls_as_single_replica(const obrpc::ObForceSetLSAsSingleReplicaArg &arg);
|
||||
int refresh_tenant_info(const obrpc::ObRefreshTenantInfoArg &arg,
|
||||
obrpc::ObRefreshTenantInfoRes &result);
|
||||
int estimate_partition_rows(const obrpc::ObEstPartArg &arg,
|
||||
|
@ -88,6 +88,7 @@ void oceanbase::observer::init_srv_xlator_for_partition(ObSrvRpcXlator *xlator)
|
||||
RPC_PROCESSOR(ObDumpSingleTxDataP, gctx_);
|
||||
RPC_PROCESSOR(ObForceSwitchILogFileP, gctx_);
|
||||
RPC_PROCESSOR(ObForceSetAllAsSingleReplicaP, gctx_);
|
||||
RPC_PROCESSOR(ObForceSetLSAsSingleReplicaP, gctx_);
|
||||
// RPC_PROCESSOR(ObSplitDestPartitionRequestP, gctx_.par_ser_);
|
||||
// RPC_PROCESSOR(ObReplicaSplitProgressRequestP, gctx_.par_ser_);
|
||||
// RPC_PROCESSOR(ObCheckMemberMajorSSTableEnoughP, gctx_.par_ser_);
|
||||
|
@ -5365,6 +5365,36 @@ OB_SERIALIZE_MEMBER(TenantServerUnitConfig,
|
||||
if_not_grant_,
|
||||
unit_id_);
|
||||
|
||||
int ObForceSetLSAsSingleReplicaArg::init(const uint64_t tenant_id, const share::ObLSID &ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|
||||
|| !ls_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObForceSetLSAsSingleReplicaArg::is_valid() const
|
||||
{
|
||||
return OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid();
|
||||
}
|
||||
|
||||
int ObForceSetLSAsSingleReplicaArg::assign(const ObForceSetLSAsSingleReplicaArg &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
tenant_id_ = other.tenant_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
OB_SERIALIZE_MEMBER(ObForceSetLSAsSingleReplicaArg, tenant_id_, ls_id_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObGetLSSyncScnArg, tenant_id_, ls_id_, check_sync_to_latest_);
|
||||
|
||||
bool ObGetLSSyncScnArg::is_valid() const
|
||||
|
@ -6114,6 +6114,32 @@ public:
|
||||
common::ObClusterRole cluster_role_;
|
||||
};
|
||||
|
||||
struct ObForceSetLSAsSingleReplicaArg
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObForceSetLSAsSingleReplicaArg(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_() {}
|
||||
~ObForceSetLSAsSingleReplicaArg() {}
|
||||
bool is_valid() const;
|
||||
int init(const uint64_t tenant_id, const share::ObLSID &ls_id);
|
||||
int assign(const ObForceSetLSAsSingleReplicaArg &other);
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id));
|
||||
|
||||
uint64_t get_tenant_id() const
|
||||
{
|
||||
return tenant_id_;
|
||||
}
|
||||
share::ObLSID get_ls_id() const
|
||||
{
|
||||
return ls_id_;
|
||||
}
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObForceSetLSAsSingleReplicaArg);
|
||||
private:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
};
|
||||
|
||||
struct ObGetLSSyncScnArg
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
@ -184,6 +184,7 @@ public:
|
||||
RPC_AP(PR1 get_ls_sync_scn, OB_GET_LS_SYNC_SCN, (obrpc::ObGetLSSyncScnArg), obrpc::ObGetLSSyncScnRes);
|
||||
RPC_AP(PR5 refresh_tenant_info, OB_REFRESH_TENANT_INFO, (obrpc::ObRefreshTenantInfoArg), obrpc::ObRefreshTenantInfoRes);
|
||||
RPC_S(PR5 sync_rewrite_rules, OB_SYNC_REWRITE_RULES, (ObSyncRewriteRuleArg));
|
||||
RPC_S(PR5 force_set_ls_as_single_replica, OB_LOG_FORCE_SET_LS_AS_SINGLE_REPLICA, (obrpc::ObForceSetLSAsSingleReplicaArg));
|
||||
}; // end of class ObSrvRpcProxy
|
||||
|
||||
} // end of namespace rpc
|
||||
|
@ -333,6 +333,35 @@ DEF_COMMAND(SERVER, force_set_all_as_single_replica, 1, "#force set all as singl
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_COMMAND(SERVER, force_set_ls_as_single_replica, 1, "tenant_id:ls_id # force set as single replica")
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
obrpc::ObForceSetLSAsSingleReplicaArg arg;
|
||||
string arg_str;
|
||||
if (cmd_ == action_name_) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
ADMIN_WARN("## Error: should provide tenant_id:ls_id");
|
||||
} else {
|
||||
arg_str = cmd_.substr(action_name_.length() + 1);
|
||||
}
|
||||
uint64_t tenant_id = 0;
|
||||
int64_t ls_id_val = -1;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (2 != sscanf(arg_str.c_str(), "%ld:%ld", &tenant_id, &ls_id_val)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
COMMON_LOG(WARN, "invalid arg", K(ret));
|
||||
} else {
|
||||
share::ObLSID ls_id(ls_id_val);
|
||||
if (OB_SUCCESS != (ret = arg.init(tenant_id, ls_id))) {
|
||||
COMMON_LOG(ERROR, "init arg failed", K(ret), K(arg), K(tenant_id), K(ls_id));
|
||||
} else if (OB_SUCCESS != (ret = client_->force_set_ls_as_single_replica(arg))) {
|
||||
COMMON_LOG(ERROR, "send req failed", K(ret), K(arg));
|
||||
}
|
||||
}
|
||||
COMMON_LOG(INFO, "force_set_ls_as_single_replica", K(ret), K(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_COMMAND(SERVER, force_create_sys_table, 1, "tenant_id:table_id:last_replay_log_id # force create sys table")
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -267,6 +267,10 @@ public:
|
||||
UNUSEDx(member_list, curr_replica_num, new_replica_num, timeout_us);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int force_set_as_single_replica()
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int add_member(const common::ObMember &member,
|
||||
const int64_t paxos_replica_num,
|
||||
const int64_t timeout_ns)
|
||||
|
Loading…
x
Reference in New Issue
Block a user