fix standby migration check transfer scn issue

This commit is contained in:
oceanoverflow
2023-07-25 13:42:48 +00:00
committed by ob-robot
parent 18dfee31f9
commit 8ba5949ac5
6 changed files with 195 additions and 44 deletions

View File

@ -14,6 +14,7 @@
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/high_availability/ob_storage_ha_utils.h"
#include "storage/high_availability/ob_ls_member_list_service.h"
#include "storage/high_availability/ob_storage_ha_src_provider.h"
namespace oceanbase
{
@ -57,13 +58,14 @@ void ObLSMemberListService::destroy()
}
int ObLSMemberListService::get_config_version_and_transfer_scn(
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn)
{
int ret = OB_SUCCESS;
config_version.reset();
transfer_scn.reset();
if (OB_FAIL(log_handler_->get_leader_config_version(config_version))) {
if (need_get_config_version && OB_FAIL(log_handler_->get_leader_config_version(config_version))) {
STORAGE_LOG(WARN, "failed to get config version", K(ret));
} else if (OB_FAIL(ls_->get_transfer_scn(transfer_scn))) {
STORAGE_LOG(WARN, "failed to get transfer scn", K(ret), KP_(ls));
@ -78,15 +80,11 @@ int ObLSMemberListService::add_member(
{
int ret = OB_SUCCESS;
palf::LogConfigVersion leader_config_version;
share::SCN leader_transfer_scn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_FAIL(get_leader_config_version_and_transfer_scn_(
leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get leader config version and transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_(leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_validity_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity", K(ret));
} else if (OB_FAIL(log_handler_->add_member(member,
paxos_replica_num,
leader_config_version,
@ -105,22 +103,18 @@ int ObLSMemberListService::replace_member(
{
int ret = OB_SUCCESS;
palf::LogConfigVersion leader_config_version;
share::SCN leader_transfer_scn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_FAIL(get_leader_config_version_and_transfer_scn_(
leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get leader config version and transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_(leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_validity_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity", K(ret));
} else if (OB_FAIL(log_handler_->replace_member(added_member,
removed_member,
leader_config_version,
timeout))) {
STORAGE_LOG(WARN, "failed to add member", K(ret), K(added_member), K(removed_member), K(leader_config_version));
} else {
STORAGE_LOG(INFO, "replace member success", K(ret), K(added_member), K(removed_member), K(leader_config_version), K(leader_transfer_scn));
STORAGE_LOG(INFO, "replace member success", K(ret), K(added_member), K(removed_member), K(leader_config_version));
}
return ret;
}
@ -133,15 +127,11 @@ int ObLSMemberListService::replace_member_with_learner(
{
int ret = OB_SUCCESS;
palf::LogConfigVersion leader_config_version;
share::SCN leader_transfer_scn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_FAIL(get_leader_config_version_and_transfer_scn_(
leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get leader config version and transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_(leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_validity_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity", K(ret));
} else if (OB_FAIL(log_handler_->replace_member_with_learner(added_member,
removed_member,
leader_config_version,
@ -160,15 +150,11 @@ int ObLSMemberListService::switch_learner_to_acceptor(
{
int ret = OB_SUCCESS;
palf::LogConfigVersion leader_config_version;
share::SCN leader_transfer_scn;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_FAIL(get_leader_config_version_and_transfer_scn_(
leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get leader config version and transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_(leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret), K(leader_config_version));
} else if (OB_FAIL(check_ls_transfer_scn_validity_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity", K(ret));
} else if (OB_FAIL(log_handler_->switch_learner_to_acceptor(learner,
paxos_replica_num,
leader_config_version,
@ -186,44 +172,169 @@ int ObLSMemberListService::get_leader_config_version_and_transfer_scn_(
{
int ret = OB_SUCCESS;
ObLSService *ls_svr = NULL;
ObStorageRpc *storage_rpc = NULL;
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
ObStorageHASrcInfo src_info;
src_info.cluster_id_ = GCONF.cluster_id;
common::ObAddr addr;
const bool need_get_config_version = true;
if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ls_->get_tenant_id(), ls_->get_ls_id(), src_info.src_addr_))) {
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ls_->get_tenant_id(), ls_->get_ls_id(), addr))) {
STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_));
} else if (OB_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr));
}
return ret;
}
int ObLSMemberListService::get_config_version_and_transfer_scn_(
const bool need_get_config_version,
const common::ObAddr &addr,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn)
{
int ret = OB_SUCCESS;
ObLSService *ls_svr = NULL;
ObStorageRpc *storage_rpc = NULL;
ObStorageHASrcInfo src_info;
src_info.cluster_id_ = GCONF.cluster_id;
src_info.src_addr_ = addr;
if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "storage rpc should not be NULL", K(ret), KP(storage_rpc));
} else if (OB_FAIL(storage_rpc->get_config_version_and_transfer_scn(ls_->get_tenant_id(),
src_info,
ls_->get_ls_id(),
leader_config_version,
leader_transfer_scn))) {
need_get_config_version,
config_version,
transfer_scn))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), KPC(ls_));
}
return ret;
}
// TODO(yangyi.yyy): how to check standby transfer scn
int ObLSMemberListService::check_ls_transfer_scn_(const share::SCN &leader_transfer_scn)
int ObLSMemberListService::check_ls_transfer_scn_(const share::SCN &transfer_scn, bool &check_pass)
{
int ret = OB_SUCCESS;
check_pass = false;
share::SCN local_transfer_scn;
if (OB_ISNULL(ls_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls should not be null", K(ret), KP_(ls));
} else if (OB_FAIL(ls_->get_transfer_scn(local_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get transfer scn", K(ret), KP_(ls));
} else if (leader_transfer_scn > local_transfer_scn) {
ret = OB_LS_TRANSFER_SCN_TOO_SMALL;
} else if (transfer_scn > local_transfer_scn) {
STORAGE_LOG(WARN, "local transfer scn is less than leader transfer scn",
K(ret), K(leader_transfer_scn), K(local_transfer_scn));
K(ret), K(transfer_scn), K(local_transfer_scn));
} else {
STORAGE_LOG(INFO, "check ls transfer scn", KPC_(ls), K(leader_transfer_scn), K(local_transfer_scn));
check_pass = true;
STORAGE_LOG(INFO, "check ls transfer scn", KPC_(ls), K(transfer_scn), K(local_transfer_scn));
}
return ret;
}
int ObLSMemberListService::get_ls_member_list_(common::ObIArray<common::ObAddr> &addr_list)
{
int ret = OB_SUCCESS;
ObStorageHASrcProvider provider;
ObMigrationOpType::TYPE type = ObMigrationOpType::MIGRATE_LS_OP;
ObLSService *ls_svr = NULL;
ObStorageRpc *storage_rpc = NULL;
if (OB_ISNULL(ls_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls should not be null", K(ret), KP_(ls));
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "storage rpc should not be NULL", K(ret), KP(storage_rpc));
} else if (OB_FAIL(provider.init(ls_->get_tenant_id(), type, storage_rpc))) {
STORAGE_LOG(WARN, "failed to init src provider", K(ret), KP_(ls));
} else if (OB_FAIL(provider.get_ls_member_list(ls_->get_tenant_id(), ls_->get_ls_id(), addr_list))) {
STORAGE_LOG(WARN, "failed to get ls member list", K(ret), KP_(ls));
}
return ret;
}
int ObLSMemberListService::check_ls_transfer_scn_validity_(palf::LogConfigVersion &leader_config_version)
{
int ret = OB_SUCCESS;
if (MTL_IS_PRIMARY_TENANT()) {
if (OB_FAIL(check_ls_transfer_scn_validity_for_primary_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity for primary", K(ret), KP_(ls));
}
} else {
if (OB_FAIL(check_ls_transfer_scn_validity_for_standby_(leader_config_version))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn validity for standby", K(ret), KP_(ls));
}
}
return ret;
}
int ObLSMemberListService::check_ls_transfer_scn_validity_for_primary_(palf::LogConfigVersion &leader_config_version)
{
int ret = OB_SUCCESS;
bool check_pass = false;
share::SCN leader_transfer_scn;
if (OB_FAIL(get_leader_config_version_and_transfer_scn_(
leader_config_version, leader_transfer_scn))) {
STORAGE_LOG(WARN, "failed to get leader config version and transfer scn", K(ret));
} else if (OB_FAIL(check_ls_transfer_scn_(leader_transfer_scn, check_pass))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret), K(leader_config_version));
} else if (!check_pass) {
ret = OB_LS_TRANSFER_SCN_TOO_SMALL;
STORAGE_LOG(WARN, "ls transfer scn too small", K(ret), K(leader_transfer_scn));
}
return ret;
}
int ObLSMemberListService::check_ls_transfer_scn_validity_for_standby_(palf::LogConfigVersion &leader_config_version)
{
int ret = OB_SUCCESS;
ObArray<ObAddr> addr_list;
ObAddr leader_addr;
if (OB_ISNULL(ls_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ls should not be null", K(ret));
} else if (OB_FAIL(get_ls_member_list_(addr_list))) {
STORAGE_LOG(WARN, "failed to get ls member list", K(ret));
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(ls_->get_tenant_id(), ls_->get_ls_id(), leader_addr))) {
STORAGE_LOG(WARN, "failed to get ls leader", K(ret), KPC(ls_));
} else {
int64_t check_pass_count = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < addr_list.count(); ++i) {
const ObAddr &addr = addr_list.at(i);
bool check_pass = false;
share::SCN transfer_scn;
palf::LogConfigVersion config_version;
bool need_get_config_version = (addr == leader_addr);
if (OB_FAIL(get_config_version_and_transfer_scn_(need_get_config_version, addr, config_version, transfer_scn))) {
STORAGE_LOG(WARN, "failed to get config version and transfer scn", K(ret), K(addr));
} else if (OB_FAIL(check_ls_transfer_scn_(transfer_scn, check_pass))) {
STORAGE_LOG(WARN, "failed to check ls transfer scn", K(ret), K(transfer_scn));
} else {
if (addr == leader_addr) {
if (!config_version.is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "config version is not valid", K(ret), K(config_version));
} else {
leader_config_version = config_version;
}
}
check_pass_count++;
}
}
if (OB_SUCC(ret)) {
// standby check transfer scn need reach majority
if (check_pass_count < (addr_list.count() / 2 + 1)) {
ret = OB_LS_TRANSFER_SCN_TOO_SMALL;
STORAGE_LOG(WARN, "transfer scn compare do not reach majority", K(ret), K(addr_list));
} else {
STORAGE_LOG(INFO, "passed transfer scn check for standby", K(ret), K(addr_list), K(check_pass_count));
}
}
}
return ret;
}

View File

@ -31,6 +31,7 @@ public:
public:
int get_config_version_and_transfer_scn(
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn);
int add_member(const common::ObMember &member,
@ -50,7 +51,16 @@ private:
int get_leader_config_version_and_transfer_scn_(
palf::LogConfigVersion &leader_config_version,
share::SCN &leader_transfer_scn);
int check_ls_transfer_scn_(const share::SCN &transfer_scn);
int get_config_version_and_transfer_scn_(
const bool need_get_config_version,
const common::ObAddr &addr,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn);
int check_ls_transfer_scn_(const share::SCN &transfer_scn, bool &is_match);
int get_ls_member_list_(common::ObIArray<common::ObAddr> &addr_list);
int check_ls_transfer_scn_validity_(palf::LogConfigVersion &leader_config_version);
int check_ls_transfer_scn_validity_for_primary_(palf::LogConfigVersion &leader_config_version);
int check_ls_transfer_scn_validity_for_standby_(palf::LogConfigVersion &leader_config_version);
private:
bool is_inited_;
storage::ObLS *ls_;

View File

@ -98,6 +98,26 @@ int ObStorageHASrcProvider::choose_ob_src(const share::ObLSID &ls_id, const SCN
return ret;
}
int ObStorageHASrcProvider::get_ls_member_list(const uint64_t tenant_id,
const share::ObLSID &ls_id, common::ObIArray<common::ObAddr> &addr_list)
{
int ret = OB_SUCCESS;
addr_list.reset();
common::ObAddr leader_addr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("start migration task do not init", K(ret));
} else if (OB_INVALID_ID == tenant_id || !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid args", K(ret), K(tenant_id), K(ls_id));
} else if (OB_FAIL(get_ls_leader_(tenant_id_, ls_id, leader_addr))) {
LOG_WARN("failed to get ls leader", K(ret), K_(tenant_id), K(ls_id));
} else if (OB_FAIL(fetch_ls_member_list_(tenant_id_, ls_id, leader_addr, addr_list))) {
LOG_WARN("failed to fetch ls leader member list", K(ret), K_(tenant_id), K(ls_id), K(leader_addr));
}
return ret;
}
int ObStorageHASrcProvider::get_ls_leader_(const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObAddr &leader)
{
int ret = OB_SUCCESS;

View File

@ -27,6 +27,8 @@ public:
int init(const uint64_t tenant_id, const ObMigrationOpType::TYPE &type, storage::ObStorageRpc *storage_rpc);
int choose_ob_src(const share::ObLSID &ls_id, const share::SCN &local_clog_checkpoint_scn,
ObStorageHASrcInfo &src_info);
int get_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id,
common::ObIArray<common::ObAddr> &addr_list);
private:
int get_ls_leader_(const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObAddr &addr);

View File

@ -931,7 +931,8 @@ OB_SERIALIZE_MEMBER(ObCheckTransferTabletBackfillRes, backfill_finished_);
ObStorageChangeMemberArg::ObStorageChangeMemberArg()
: tenant_id_(OB_INVALID_ID),
ls_id_()
ls_id_(),
need_get_config_version_(true)
{
}
@ -947,7 +948,7 @@ void ObStorageChangeMemberArg::reset()
ls_id_.reset();
}
OB_SERIALIZE_MEMBER(ObStorageChangeMemberArg, tenant_id_, ls_id_);
OB_SERIALIZE_MEMBER(ObStorageChangeMemberArg, tenant_id_, ls_id_, need_get_config_version_);
ObStorageChangeMemberRes::ObStorageChangeMemberRes()
: config_version_(),
@ -2752,6 +2753,7 @@ int ObStorageGetConfigVersionAndTransferScnP::process()
int ret = OB_SUCCESS;
const uint64_t tenant_id = arg_.tenant_id_;
const share::ObLSID &ls_id = arg_.ls_id_;
const bool need_get_config_version = arg_.need_get_config_version_;
MTL_SWITCH(tenant_id) {
ObLSHandle ls_handle;
ObLSService *ls_service = NULL;
@ -2765,7 +2767,8 @@ int ObStorageGetConfigVersionAndTransferScnP::process()
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls));
} else if (OB_FAIL(ls->get_config_version_and_transfer_scn(result_.config_version_,
} else if (OB_FAIL(ls->get_config_version_and_transfer_scn(need_get_config_version,
result_.config_version_,
result_.transfer_scn_))) {
LOG_WARN("failed to get config version and transfer scn", K(ret), K(tenant_id), K(ls_id));
} else {
@ -3532,6 +3535,7 @@ int ObStorageRpc::get_config_version_and_transfer_scn(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn)
{
@ -3549,6 +3553,7 @@ int ObStorageRpc::get_config_version_and_transfer_scn(
ObStorageChangeMemberRes res;
arg.tenant_id_ = tenant_id;
arg.ls_id_ = ls_id;
arg.need_get_config_version_ = need_get_config_version;
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)

View File

@ -580,9 +580,10 @@ public:
~ObStorageChangeMemberArg() {}
bool is_valid() const;
void reset();
TO_STRING_KV(K_(tenant_id), K_(ls_id));
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(need_get_config_version));
uint64_t tenant_id_;
share::ObLSID ls_id_;
bool need_get_config_version_;
private:
DISALLOW_COPY_AND_ASSIGN(ObStorageChangeMemberArg);
};
@ -1166,6 +1167,7 @@ public:
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn) = 0;
virtual int block_tx(
@ -1291,6 +1293,7 @@ public:
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info,
const share::ObLSID &ls_id,
const bool need_get_config_version,
palf::LogConfigVersion &config_version,
share::SCN &transfer_scn);
virtual int block_tx(