fix switchover during upgrade
This commit is contained in:
@ -1601,7 +1601,7 @@ int ObRpcGetLSAccessModeP::process()
|
||||
const SCN ref_scn = SCN::min_scn();
|
||||
if (OB_FAIL(log_handler->get_access_mode(mode_version, mode))) {
|
||||
LOG_WARN("failed to get access mode", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(result_.init(tenant_id, ls_id, mode_version, mode, ref_scn, GCTX.self_addr()))) {
|
||||
} else if (OB_FAIL(result_.init(tenant_id, ls_id, mode_version, mode, ref_scn))) {
|
||||
LOG_WARN("failed to init res", KR(ret), K(tenant_id), K(ls_id), K(mode_version), K(mode));
|
||||
} else if (OB_FAIL(log_ls_svr->get_palf_role(ls_id, role, second_proposal_id))) {
|
||||
COMMON_LOG(WARN, "failed to get palf role", KR(ret), K(ls_id));
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
#include "lib/utility/ob_print_utils.h"// TO_STRING_KV
|
||||
#include "rootserver/ob_cluster_event.h"// CLUSTER_EVENT_ADD_CONTROL
|
||||
#include "rootserver/ob_rs_event_history_table_operator.h" // ROOTSERVICE_EVENT_ADD
|
||||
#include "share/ob_rpc_struct.h"//ObLSAccessModeInfo
|
||||
#include "share/ob_rpc_struct.h"//ObObLSAccessModeInfo
|
||||
#include "observer/ob_server_struct.h"//GCTX
|
||||
#include "share/location_cache/ob_location_service.h"//get ls leader
|
||||
#include "share/ob_global_stat_proxy.h"//ObGlobalStatProxy
|
||||
@ -40,52 +40,7 @@ const char* const ObTenantRoleTransitionConstants::SWITCH_TO_PRIMARY_LOG_MOD_STR
|
||||
const char* const ObTenantRoleTransitionConstants::SWITCH_TO_STANDBY_LOG_MOD_STR = "SWITCH_TO_STANDBY";
|
||||
const char* const ObTenantRoleTransitionConstants::RESTORE_TO_STANDBY_LOG_MOD_STR = "RESTORE_TO_STANDBY";
|
||||
|
||||
////////////LSAccessModeInfo/////////////////
|
||||
int ObTenantRoleTransitionService::LSAccessModeInfo::init(
|
||||
uint64_t tenant_id, const ObLSID &ls_id, const ObAddr &addr,
|
||||
const int64_t mode_version,
|
||||
const palf::AccessMode &access_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|
||||
|| !ls_id.is_valid() || !addr.is_valid()
|
||||
|| palf::INVALID_PROPOSAL_ID == mode_version
|
||||
|| palf::AccessMode::INVALID_ACCESS_MODE == access_mode)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(addr),
|
||||
K(access_mode), K(mode_version));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
leader_addr_ = addr;
|
||||
mode_version_ = mode_version;
|
||||
access_mode_ = access_mode;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantRoleTransitionService::LSAccessModeInfo::assign(const LSAccessModeInfo &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this != &other) {
|
||||
reset();
|
||||
tenant_id_ = other.tenant_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
leader_addr_ = other.leader_addr_;
|
||||
mode_version_ = other.mode_version_;
|
||||
access_mode_ = other.access_mode_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenantRoleTransitionService::LSAccessModeInfo::reset()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_TENANT_ID;
|
||||
ls_id_.reset();
|
||||
leader_addr_.reset();
|
||||
access_mode_ = palf::AccessMode::INVALID_ACCESS_MODE;
|
||||
mode_version_ = palf::INVALID_PROPOSAL_ID;
|
||||
}
|
||||
////////////ObTenantRoleTransitionService//////////////
|
||||
int ObTenantRoleTransitionService::check_inner_stat()
|
||||
{
|
||||
@ -562,8 +517,8 @@ int ObTenantRoleTransitionService::change_ls_access_mode_(palf::AccessMode targe
|
||||
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.internal_sql_execute_timeout))) {
|
||||
LOG_WARN("failed to set default timeout", KR(ret));
|
||||
} else {
|
||||
ObArray<LSAccessModeInfo> ls_mode_info;
|
||||
ObArray<LSAccessModeInfo> need_change_info;
|
||||
ObArray<ObLSAccessModeInfo> ls_mode_info;
|
||||
ObArray<ObLSAccessModeInfo> need_change_info;
|
||||
//ignore error, try until success
|
||||
bool need_retry = true;
|
||||
do {
|
||||
@ -578,8 +533,8 @@ int ObTenantRoleTransitionService::change_ls_access_mode_(palf::AccessMode targe
|
||||
LOG_WARN("failed to get ls access mode", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ls_mode_info.count(); ++i) {
|
||||
const LSAccessModeInfo &info = ls_mode_info.at(i);
|
||||
if (info.access_mode_ == target_access_mode) {
|
||||
const ObLSAccessModeInfo &info = ls_mode_info.at(i);
|
||||
if (info.get_access_mode() == target_access_mode) {
|
||||
//nothing, no need change
|
||||
} else if (OB_FAIL(need_change_info.push_back(info))) {
|
||||
LOG_WARN("failed to assign", KR(ret), K(i), K(info));
|
||||
@ -655,7 +610,7 @@ int do_nonblock_renew(const ARRAY_L &array_l, const ARRAY_R &array_r, const uint
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<LSAccessModeInfo> &ls_access_info)
|
||||
int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<ObLSAccessModeInfo> &ls_access_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ls_access_info.reset();
|
||||
@ -720,7 +675,6 @@ int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<LSAccessModeInfo
|
||||
K(rpc_count), K(return_code_array), "arg count",
|
||||
proxy.get_args().count());
|
||||
} else {
|
||||
LSAccessModeInfo info;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) {
|
||||
ret = return_code_array.at(i);
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -730,13 +684,10 @@ int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<LSAccessModeInfo
|
||||
if (OB_ISNULL(result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(ret), K(i));
|
||||
} else if (OB_FAIL(info.init(tenant_id_, result->get_ls_id(),
|
||||
result->get_addr(), result->get_mode_version(), result->get_access_mode()))) {
|
||||
LOG_WARN("failed to init info", KR(ret), KPC(result));
|
||||
} else if (OB_FAIL(ls_access_info.push_back(info))) {
|
||||
LOG_WARN("failed to push back info", KR(ret), K(info));
|
||||
} else if (OB_TMP_FAIL(success_ls_ids.push_back(info.get_ls_id()))) {
|
||||
LOG_WARN("fail to push back", KR(ret), KR(tmp_ret), K(success_ls_ids), K(info));
|
||||
} else if (OB_FAIL(ls_access_info.push_back(*result))) {
|
||||
LOG_WARN("failed to push back info", KR(ret), KPC(result));
|
||||
} else if (OB_TMP_FAIL(success_ls_ids.push_back(result->get_ls_id()))) {
|
||||
LOG_WARN("fail to push back", KR(ret), KR(tmp_ret), K(success_ls_ids), KPC(result));
|
||||
}
|
||||
}
|
||||
LOG_INFO("[ROLE_TRANSITION] get ls access mode", KR(ret), K(arg));
|
||||
@ -755,7 +706,7 @@ int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<LSAccessModeInfo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<LSAccessModeInfo> &ls_access_info,
|
||||
int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<ObLSAccessModeInfo> &ls_access_info,
|
||||
palf::AccessMode target_access_mode, const SCN &ref_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -776,16 +727,21 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<LSAc
|
||||
LOG_WARN("fail to set timeout ctx", KR(ret));
|
||||
} else {
|
||||
ObChangeLSAccessModeProxy proxy(*rpc_proxy_, &obrpc::ObSrvRpcProxy::change_ls_access_mode);
|
||||
ObAddr leader;
|
||||
obrpc::ObLSAccessModeInfo arg;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ls_access_info.count(); ++i) {
|
||||
const LSAccessModeInfo &info = ls_access_info.at(i);
|
||||
const obrpc::ObLSAccessModeInfo &info = ls_access_info.at(i);
|
||||
const int64_t timeout = ctx.get_timeout();
|
||||
if (OB_FAIL(arg.init(tenant_id_, info.ls_id_, info.mode_version_, target_access_mode, ref_scn, info.leader_addr_))) {
|
||||
LOG_WARN("failed to init arg", KR(ret), K(info), K(target_access_mode), K(ref_scn));
|
||||
if (OB_FAIL(GCTX.location_service_->get_leader(
|
||||
GCONF.cluster_id, tenant_id_, info.get_ls_id(), false, leader))) {
|
||||
LOG_WARN("failed to get leader", KR(ret), K(tenant_id_), K(info));
|
||||
// use meta rpc process thread
|
||||
} else if (OB_FAIL(proxy.call(info.leader_addr_, timeout, GCONF.cluster_id, gen_meta_tenant_id(tenant_id_), arg))) {
|
||||
} else if (OB_FAIL(arg.init(tenant_id_, info.get_ls_id(), info.get_mode_version(),
|
||||
target_access_mode, ref_scn))) {
|
||||
LOG_WARN("failed to init arg", KR(ret), K(info), K(target_access_mode), K(ref_scn));
|
||||
} else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, gen_meta_tenant_id(tenant_id_), arg))) {
|
||||
//can not ignore of each ls
|
||||
LOG_WARN("failed to send rpc", KR(ret), K(info), K(timeout), K(tenant_id_), K(arg));
|
||||
LOG_WARN("failed to send rpc", KR(ret), K(arg), K(timeout), K(tenant_id_), K(info));
|
||||
}
|
||||
}//end for
|
||||
//result
|
||||
@ -805,6 +761,7 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<LSAc
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) {
|
||||
ret = return_code_array.at(i);
|
||||
const auto *result = proxy.get_results().at(i);
|
||||
const obrpc::ObLSAccessModeInfo &info = ls_access_info.at(i);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("send rpc is failed", KR(ret), K(i));
|
||||
} else if (OB_ISNULL(result)) {
|
||||
@ -816,7 +773,7 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<LSAc
|
||||
LOG_WARN("fail to push back", KR(ret), KR(tmp_ret), K(success_ls_ids), K(result));
|
||||
}
|
||||
|
||||
LOG_INFO("[ROLE_TRANSITION] change ls access mode", KR(ret), K(arg), KPC(result), K(proxy.get_dests()));
|
||||
LOG_INFO("[ROLE_TRANSITION] change ls access mode", KR(ret), K(info), KPC(result), K(proxy.get_dests()));
|
||||
}// end for
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
|
||||
@ -24,6 +24,7 @@ namespace oceanbase
|
||||
namespace obrpc
|
||||
{
|
||||
class ObSrvRpcProxy;
|
||||
struct ObLSAccessModeInfo;
|
||||
}
|
||||
namespace common
|
||||
{
|
||||
@ -71,41 +72,6 @@ int do_nonblock_renew(const ARRAY &array_l, const ARRAY &array_r, const uint64_t
|
||||
*/
|
||||
class ObTenantRoleTransitionService
|
||||
{
|
||||
public:
|
||||
struct LSAccessModeInfo
|
||||
{
|
||||
LSAccessModeInfo(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_(),
|
||||
leader_addr_(),
|
||||
mode_version_(palf::INVALID_PROPOSAL_ID),
|
||||
access_mode_(palf::AccessMode::INVALID_ACCESS_MODE) { }
|
||||
virtual ~LSAccessModeInfo() {}
|
||||
bool is_valid() const
|
||||
{
|
||||
return OB_INVALID_TENANT_ID != tenant_id_
|
||||
&& ls_id_.is_valid()
|
||||
&& leader_addr_.is_valid()
|
||||
&& palf::INVALID_PROPOSAL_ID != mode_version_
|
||||
&& palf::AccessMode::INVALID_ACCESS_MODE != access_mode_;
|
||||
|
||||
}
|
||||
int init(uint64_t tenant_id, const share::ObLSID &ls_id, const ObAddr &addr,
|
||||
const int64_t mode_version, const palf::AccessMode &access_mode);
|
||||
int assign(const LSAccessModeInfo &other);
|
||||
void reset();
|
||||
share::ObLSID get_ls_id() const
|
||||
{
|
||||
return ls_id_;
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(leader_addr),
|
||||
K_(mode_version), K_(access_mode));
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
ObAddr leader_addr_;
|
||||
int64_t mode_version_;
|
||||
palf::AccessMode access_mode_;
|
||||
};
|
||||
|
||||
public:
|
||||
ObTenantRoleTransitionService(const uint64_t tenant_id,
|
||||
common::ObMySQLProxy *sql_proxy,
|
||||
@ -196,8 +162,8 @@ private:
|
||||
int do_flashback_();
|
||||
int change_ls_access_mode_(palf::AccessMode target_access_mode,
|
||||
const share::SCN &ref_scn);
|
||||
int get_ls_access_mode_(ObIArray<LSAccessModeInfo> &ls_access_info);
|
||||
int do_change_ls_access_mode_(const ObIArray<LSAccessModeInfo> &ls_access_info,
|
||||
int get_ls_access_mode_(ObIArray<obrpc::ObLSAccessModeInfo> &ls_access_info);
|
||||
int do_change_ls_access_mode_(const ObIArray<obrpc::ObLSAccessModeInfo> &ls_access_info,
|
||||
palf::AccessMode target_access_mode,
|
||||
const share::SCN &ref_scn);
|
||||
int do_switch_access_mode_to_flashback(
|
||||
|
||||
@ -7412,33 +7412,29 @@ bool ObLSAccessModeInfo::is_valid() const
|
||||
&& ls_id_.is_valid()
|
||||
&& palf::INVALID_PROPOSAL_ID != mode_version_
|
||||
&& palf::AccessMode::INVALID_ACCESS_MODE != access_mode_
|
||||
&& ref_scn_.is_valid()
|
||||
&& addr_.is_valid();
|
||||
&& ref_scn_.is_valid();
|
||||
}
|
||||
int ObLSAccessModeInfo::init(
|
||||
uint64_t tenant_id, const ObLSID &ls_id,
|
||||
const int64_t mode_version,
|
||||
const palf::AccessMode &access_mode,
|
||||
const SCN &ref_scn,
|
||||
const ObAddr &addr)
|
||||
const SCN &ref_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|
||||
|| !ls_id.is_valid()
|
||||
|| palf::AccessMode::INVALID_ACCESS_MODE == access_mode
|
||||
|| palf::INVALID_PROPOSAL_ID == mode_version
|
||||
|| !ref_scn.is_valid()
|
||||
|| !addr.is_valid())) {
|
||||
|| !ref_scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id),
|
||||
K(mode_version), K(access_mode), K(ref_scn), K(addr));
|
||||
K(mode_version), K(access_mode), K(ref_scn));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
mode_version_ = mode_version;
|
||||
access_mode_ = access_mode;
|
||||
ref_scn_ = ref_scn;
|
||||
addr_ = addr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -7451,7 +7447,6 @@ int ObLSAccessModeInfo::assign(const ObLSAccessModeInfo &other)
|
||||
mode_version_ = other.mode_version_;
|
||||
access_mode_ = other.access_mode_;
|
||||
ref_scn_ = other.ref_scn_;
|
||||
addr_ = other.addr_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -3207,18 +3207,16 @@ public:
|
||||
ls_id_(),
|
||||
mode_version_(palf::INVALID_PROPOSAL_ID),
|
||||
access_mode_(palf::AccessMode::INVALID_ACCESS_MODE),
|
||||
ref_scn_(),
|
||||
addr_() {}
|
||||
ref_scn_(), addr_() {}
|
||||
~ObLSAccessModeInfo() {}
|
||||
bool is_valid() const;
|
||||
int init(uint64_t tenant_id, const share::ObLSID &ls_idd,
|
||||
const int64_t mode_version,
|
||||
const palf::AccessMode &access_mode,
|
||||
const share::SCN &ref_scn,
|
||||
const ObAddr &addr);
|
||||
const share::SCN &ref_scn);
|
||||
int assign(const ObLSAccessModeInfo &other);
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(mode_version),
|
||||
K_(access_mode), K_(ref_scn), K_(addr));
|
||||
K_(access_mode), K_(ref_scn));
|
||||
uint64_t get_tenant_id() const
|
||||
{
|
||||
return tenant_id_;
|
||||
@ -3239,11 +3237,6 @@ public:
|
||||
{
|
||||
return ref_scn_;
|
||||
}
|
||||
|
||||
const ObAddr &get_addr() const
|
||||
{
|
||||
return addr_;
|
||||
}
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObLSAccessModeInfo);
|
||||
private:
|
||||
@ -3252,7 +3245,7 @@ private:
|
||||
int64_t mode_version_;
|
||||
palf::AccessMode access_mode_;
|
||||
share::SCN ref_scn_;
|
||||
ObAddr addr_;
|
||||
ObAddr addr_;//no used, add in 4200 RC1
|
||||
};
|
||||
|
||||
struct ObChangeLSAccessModeRes
|
||||
|
||||
Reference in New Issue
Block a user