The migration task judges whether the ls is a valid member by sending rpc to the leader of the ls

This commit is contained in:
WenJinyu
2023-08-10 04:12:36 +00:00
committed by ob-robot
parent e0a0ce93bd
commit 3c91389ff2
11 changed files with 210 additions and 25 deletions

View File

@ -140,6 +140,7 @@ constexpr int OB_ERR_INTERVAL_PARTITION_ERROR = -4729;
constexpr int OB_FROZEN_INFO_ALREADY_EXIST = -4744;
constexpr int OB_CREATE_STANDBY_TENANT_FAILED = -4765;
constexpr int OB_LS_WAITING_SAFE_DESTROY = -4766;
constexpr int OB_LS_LOCK_CONFLICT = -4768;
constexpr int OB_ERR_PARSER_SYNTAX = -5006;
constexpr int OB_ERR_COLUMN_NOT_FOUND = -5031;
constexpr int OB_ERR_SYS_VARIABLE_UNKNOWN = -5044;

View File

@ -475,6 +475,7 @@ PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B7)
PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4B8)
PCODE_DEF(OB_HA_WAKEUP_TRANSFER_SERVICE, 0x4B9)
PCODE_DEF(OB_HA_SUBMIT_TX_LOG, 0x4BA)
PCODE_DEF(OB_HA_FETCH_LS_MEMBER_AND_LEARNER_LIST, 0x4BB)
// sql, including executor

View File

@ -156,6 +156,7 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageWakeupTransferServiceP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObFetchLSMemberAndLearnerListP);
}
void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {

File diff suppressed because one or more lines are too long

View File

@ -658,6 +658,7 @@ DEFINE_ORACLE_ERROR_EXT(OB_ERR_UNKNOWN_SET_OPTION, -4764, -1, "HY000", "unknown
DEFINE_ERROR_EXT_DEP(OB_CREATE_STANDBY_TENANT_FAILED, -4765, -1, "HY000", "create standby tenant may fail", "create standby tenant may fail, %s");
DEFINE_ERROR_EXT_DEP(OB_LS_WAITING_SAFE_DESTROY, -4766, -1, "HY000", "ls waiting safe destory", "ls waiting safe destory, %s");
DEFINE_ERROR(OB_LS_NOT_LEADER, -4767, -1, "HY000", "log stream is not leader log stream");
DEFINE_ERROR_EXT_DEP(OB_LS_LOCK_CONFLICT, -4768, -1, "HY000", "ls lock conflict", "ls lock conflict, %s");
////////////////////////////////////////////////////////////////
// SQL & Schema specific error code, -5000 ~ -6000

View File

@ -2331,6 +2331,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_CREATE_STANDBY_TENANT_FAILED__USER_ERROR_MSG "create standby tenant may fail, %s"
#define OB_LS_WAITING_SAFE_DESTROY__USER_ERROR_MSG "ls waiting safe destory, %s"
#define OB_LS_NOT_LEADER__USER_ERROR_MSG "log stream is not leader log stream"
#define OB_LS_LOCK_CONFLICT__USER_ERROR_MSG "ls lock conflict, %s"
#define OB_ERR_PARSER_INIT__USER_ERROR_MSG "Failed to init SQL parser"
#define OB_ERR_PARSE_SQL__USER_ERROR_MSG "%s near \'%.*s\' at line %d"
#define OB_ERR_RESOLVE_SQL__USER_ERROR_MSG "Resolve error"
@ -4440,6 +4441,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_CREATE_STANDBY_TENANT_FAILED__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4765, create standby tenant may fail, %s"
#define OB_LS_WAITING_SAFE_DESTROY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4766, ls waiting safe destory, %s"
#define OB_LS_NOT_LEADER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4767, log stream is not leader log stream"
#define OB_LS_LOCK_CONFLICT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4768, ls lock conflict, %s"
#define OB_ERR_PARSER_INIT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5000, Failed to init SQL parser"
#define OB_ERR_PARSE_SQL__ORA_USER_ERROR_MSG "ORA-00900: %s near \'%.*s\' at line %d"
#define OB_ERR_RESOLVE_SQL__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5002, Resolve error"
@ -5989,7 +5991,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
extern int g_all_ob_errnos[2105];
extern int g_all_ob_errnos[2106];
const char *ob_error_name(const int oberr);
const char* ob_error_cause(const int oberr);

View File

@ -1305,8 +1305,11 @@ int ObStartCompleteMigrationTask::change_member_list_()
share::SCN ls_transfer_scn;
uint64_t cluster_version = 0;
palf::LogConfigVersion fake_config_version;
if (!is_inited_) {
#ifdef ERRSIM
ret = OB_E(EventTable::EN_SET_MEMBER_LIST_FAIL) OB_SUCCESS;
#endif
if (OB_FAIL(ret)) {
} else if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("start complete migration task do not init", K(ret));
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {

View File

@ -414,38 +414,45 @@ int ObStorageHADagUtils::check_self_is_valid_member(
bool &is_valid_member)
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLS *ls = nullptr;
logservice::ObLogHandler *log_handler = NULL;
common::ObMemberList member_list;
common::GlobalLearnerList learner_list;
int64_t paxos_replica_num = 0;
const ObAddr &self_addr = GCONF.self_addr_;
is_valid_member = false;
const uint64_t tenant_id = MTL_ID();
share::ObLocationService *location_service = nullptr;
const bool force_renew = true;
common::ObAddr leader_addr;
ObLSService *ls_service = nullptr;
storage::ObStorageRpc *storage_rpc = nullptr;
obrpc::ObFetchLSMemberAndLearnerListInfo member_info;
storage::ObStorageHASrcInfo src_info;
src_info.cluster_id_ = GCONF.cluster_id;
const ObAddr &self_addr = GCONF.self_addr_;
if (!ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check self in member list get invalid argument", K(ret), K(ls_id));
} else if (OB_FAIL(ObStorageHADagUtils::get_ls(ls_id, ls_handle))) {
LOG_WARN("failed to get ls", K(ret), K(ls_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
} else if (OB_ISNULL(location_service = GCTX.location_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls), K(ls_id));
} else if (OB_ISNULL(log_handler = ls->get_log_handler())) {
LOG_WARN("location service should not be NULL", K(ret), KP(location_service));
} else if (OB_FAIL(location_service->get_leader(src_info.cluster_id_, tenant_id, ls_id, force_renew, leader_addr))) {
LOG_WARN("fail to get ls leader server", K(ret), K(tenant_id), K(ls_id));
} else if (FALSE_IT(src_info.src_addr_ = leader_addr)) {
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log handler should not be NULL", K(ret));
} else if (OB_FAIL(log_handler->get_paxos_member_list_and_learner_list(member_list, paxos_replica_num, learner_list))) {
LOG_WARN("failed to get paxos member list and learner list", K(ret));
} else if (member_list.contains(self_addr)) {
LOG_WARN("ls service should not be NULL", K(ret), K(tenant_id), K(ls_id));
} else if (OB_ISNULL(storage_rpc = ls_service->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage rpc should not be NULL", K(ret), K(tenant_id), K(ls_id));
} else if (OB_FAIL(storage_rpc->fetch_ls_member_and_learner_list(tenant_id, ls_id, src_info, member_info))) {
LOG_WARN("failed to check ls is valid member", K(ret), K(tenant_id), K(ls_id));
} else if (member_info.member_list_.contains(self_addr)) {
is_valid_member = true;
} else if (!learner_list.contains(self_addr)) {
} else if (!member_info.learner_list_.contains(self_addr)) {
is_valid_member = false;
} else {
ObMember member;
if (OB_FAIL(learner_list.get_learner_by_addr(self_addr, member))) {
if (OB_FAIL(member_info.learner_list_.get_learner_by_addr(self_addr, member))) {
LOG_WARN("failed to get_learner_by_addr", K(ret));
} else if (member.is_migrating()) {
is_valid_member = false;
LOG_INFO("self is not valid member", K(ret), K(member), K(member_list), K(learner_list), K(self_addr), K(ls_id));
LOG_INFO("self is not valid member", K(ret), K(member), K(member_info), K(self_addr), K(ls_id));
} else {
is_valid_member = true;
}

View File

@ -2221,7 +2221,7 @@ int ObLS::set_ls_rebuild()
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, try_lock);
if (!lock_myself.locked()) {
ret = OB_EAGAIN;
ret = OB_LS_LOCK_CONFLICT;
LOG_WARN("try lock failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;

View File

@ -520,6 +520,43 @@ void ObFetchLSMemberListInfo::reset()
OB_SERIALIZE_MEMBER(ObFetchLSMemberListInfo, member_list_);
ObFetchLSMemberAndLearnerListArg::ObFetchLSMemberAndLearnerListArg()
: tenant_id_(OB_INVALID_ID),
ls_id_()
{
}
bool ObFetchLSMemberAndLearnerListArg::is_valid() const
{
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid();
}
void ObFetchLSMemberAndLearnerListArg::reset()
{
tenant_id_ = OB_INVALID_ID;
}
OB_SERIALIZE_MEMBER(ObFetchLSMemberAndLearnerListArg, tenant_id_, ls_id_);
ObFetchLSMemberAndLearnerListInfo::ObFetchLSMemberAndLearnerListInfo()
: member_list_(),
learner_list_()
{
}
bool ObFetchLSMemberAndLearnerListInfo::is_valid() const
{
return member_list_.is_valid() || learner_list_.is_valid();
}
void ObFetchLSMemberAndLearnerListInfo::reset()
{
member_list_.reset();
learner_list_.reset();
}
OB_SERIALIZE_MEMBER(ObFetchLSMemberAndLearnerListInfo, member_list_, learner_list_);
ObCopySSTableMacroRangeInfoArg::ObCopySSTableMacroRangeInfoArg()
: tenant_id_(OB_INVALID_ID),
ls_id_(),
@ -1842,6 +1879,61 @@ int ObFetchLSMemberListP::process()
return ret;
}
ObFetchLSMemberAndLearnerListP::ObFetchLSMemberAndLearnerListP()
{
}
int ObFetchLSMemberAndLearnerListP::process()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = arg_.tenant_id_;
const share::ObLSID &ls_id = arg_.ls_id_;
MTL_SWITCH(tenant_id) {
ObLSService *ls_svr = NULL;
ObLSHandle ls_handle;
ObLS *ls = NULL;
logservice::ObLogHandler *log_handler = NULL;
common::ObMemberList member_list;
int64_t paxos_replica_num = 0;
logservice::ObLogService *log_service = nullptr;
ObRole role;
int64_t proposal_id = 0;
common::GlobalLearnerList learner_list;
if (tenant_id != MTL_ID()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rpc get member list tenant not match", K(ret), K(tenant_id));
} else if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log service should not be NULL", K(ret), KP(log_service));
} else if (OB_FAIL(log_service->get_palf_role(ls_id, role, proposal_id))) {
LOG_WARN("failed to get role", K(ret), "arg", arg_);
} else if (!is_strong_leader(role)) {
ret = OB_PARTITION_NOT_LEADER;
LOG_WARN("ls is not leader, cannot get member list", K(ret), K(role), K(arg_));
} else if (OB_ISNULL(ls_svr = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service should not be null", K(ret));
} else if (OB_FAIL(ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
LOG_WARN("failed to get ls", K(ret), K(ls_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be null", K(ret));
} else if (OB_ISNULL(log_handler = ls->get_log_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log handler should not be NULL", K(ret));
} else if (OB_FAIL(log_handler->get_paxos_member_list_and_learner_list(member_list, paxos_replica_num, learner_list))) {
LOG_WARN("failed to get paxos member list and learner list", K(ret));
} else if (OB_FAIL(result_.member_list_.deep_copy(member_list))) {
LOG_WARN("failed to assign member list", K(ret), K(member_list));
} else if (OB_FAIL(result_.learner_list_.deep_copy(learner_list))) {
LOG_WARN("failed to assign learner list", K(ret), K(learner_list));
}
}
return ret;
}
ObFetchSSTableMacroInfoP::ObFetchSSTableMacroInfoP(common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
{
@ -3796,5 +3888,26 @@ int ObStorageRpc::wakeup_transfer_service(
return ret;
}
int ObStorageRpc::fetch_ls_member_and_learner_list(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObStorageHASrcInfo &src_info,
obrpc::ObFetchLSMemberAndLearnerListInfo &member_info)
{
int ret = OB_SUCCESS;
obrpc::ObFetchLSMemberAndLearnerListArg arg;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_)
.fetch_ls_member_and_learner_list(arg, member_info))) {
LOG_WARN("fail to check ls is valid member", K(ret), K(tenant_id), K(ls_id));
}
return ret;
}
} // storage
} // oceanbase

View File

@ -299,6 +299,34 @@ public:
common::ObMemberList member_list_;
};
struct ObFetchLSMemberAndLearnerListArg
{
OB_UNIS_VERSION(2);
public:
ObFetchLSMemberAndLearnerListArg();
virtual ~ObFetchLSMemberAndLearnerListArg() {}
bool is_valid() const;
void reset();
TO_STRING_KV(K_(tenant_id), K_(ls_id));
uint64_t tenant_id_;
share::ObLSID ls_id_;
};
struct ObFetchLSMemberAndLearnerListInfo
{
OB_UNIS_VERSION(2);
public:
ObFetchLSMemberAndLearnerListInfo();
virtual ~ObFetchLSMemberAndLearnerListInfo() {}
bool is_valid() const;
void reset();
TO_STRING_KV(K_(member_list), K_(learner_list));
common::ObMemberList member_list_;
common::GlobalLearnerList learner_list_;
};
struct ObCopySSTableMacroRangeInfoArg final
{
OB_UNIS_VERSION(2);
@ -743,7 +771,6 @@ public:
RPC_SS(PR5 lob_query, OB_LOB_QUERY, (ObLobQueryArg), common::ObDataBuffer);
RPC_SS(PR5 fetch_transfer_tablet_info, OB_FETCH_TRANSFER_TABLET_INFO, (ObTransferTabletInfoArg), common::ObDataBuffer);
RPC_SS(PR5 fetch_ls_view, OB_HA_FETCH_LS_VIEW, (ObCopyLSViewArg), common::ObDataBuffer);
RPC_S(PR5 fetch_ls_member_list, OB_HA_FETCH_LS_MEMBER_LIST, (ObFetchLSMemberListArg), ObFetchLSMemberListInfo);
RPC_S(PR5 fetch_ls_meta_info, OB_HA_FETCH_LS_META_INFO, (ObFetchLSMetaInfoArg), ObFetchLSMetaInfoResp);
RPC_S(PR5 fetch_ls_info, OB_HA_FETCH_LS_INFO, (ObCopyLSInfoArg), ObCopyLSInfo);
@ -763,6 +790,7 @@ public:
RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 wakeup_transfer_service, OB_HA_WAKEUP_TRANSFER_SERVICE, (ObStorageWakeupTransferServiceArg));
RPC_S(PR5 fetch_ls_member_and_learner_list, OB_HA_FETCH_LS_MEMBER_AND_LEARNER_LIST, (ObFetchLSMemberAndLearnerListArg), ObFetchLSMemberAndLearnerListInfo);
};
template <ObRpcPacketCode RPC_CODE>
@ -858,6 +886,16 @@ protected:
int process();
};
class ObFetchLSMemberAndLearnerListP:
public ObStorageRpcProxy::Processor<OB_HA_FETCH_LS_MEMBER_AND_LEARNER_LIST>
{
public:
explicit ObFetchLSMemberAndLearnerListP();
virtual ~ObFetchLSMemberAndLearnerListP() { }
protected:
int process();
};
class ObFetchSSTableMacroInfoP :
public ObStorageStreamRpcP<OB_HA_FETCH_SSTABLE_MACRO_INFO>
{
@ -1332,6 +1370,11 @@ public:
virtual int wakeup_transfer_service(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info);
virtual int fetch_ls_member_and_learner_list(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
const ObStorageHASrcInfo &src_info,
obrpc::ObFetchLSMemberAndLearnerListInfo &member_info);
private:
bool is_inited_;