[fix][PALF] migrating/replicating replicas as learners

Co-authored-by: oceanoverflow <oceanoverflow@gmail.com>
Co-authored-by: joseph12138 <17862707376@163.com>
This commit is contained in:
BinChenn
2023-07-17 13:18:17 +00:00
committed by ob-robot
parent b3db035771
commit f397371718
42 changed files with 1608 additions and 277 deletions

View File

@ -69,11 +69,12 @@ class ObGarbageCollector::QueryLSIsValidMemberFunctor
{
public:
QueryLSIsValidMemberFunctor(obrpc::ObSrvRpcProxy *rpc_proxy,
obrpc::ObLogServiceRpcProxy *log_rpc_proxy,
ObLSService *ls_service,
const common::ObAddr &self_addr,
const int64_t gc_seq,
ObGCCandidateArray &gc_candidates)
: rpc_proxy_(rpc_proxy), ls_service_(ls_service), self_addr_(self_addr),
: rpc_proxy_(rpc_proxy), log_rpc_proxy_(log_rpc_proxy), ls_service_(ls_service), self_addr_(self_addr),
gc_seq_(gc_seq), gc_candidates_(gc_candidates), ret_value_(common::OB_SUCCESS) {}
~QueryLSIsValidMemberFunctor() {}
public:
@ -85,7 +86,9 @@ public:
return common::OB_SUCCESS == ret_value_;
}
int get_ret_value() const { return ret_value_; }
TO_STRING_KV(K(self_addr_), K(gc_seq_));
private:
int remove_self_from_learnerlist_(const ObAddr &leader, ObLS *ls);
int handle_ls_array_(const ObAddr &leader,
const ObLSArray &ls_array);
int handle_rpc_response_(const ObAddr &leader,
@ -93,6 +96,7 @@ private:
int try_renew_location_(const ObLSArray &ls_array);
private:
obrpc::ObSrvRpcProxy *rpc_proxy_;
obrpc::ObLogServiceRpcProxy *log_rpc_proxy_;
ObLSService *ls_service_;
common::ObAddr self_addr_;
int64_t gc_seq_;
@ -102,6 +106,38 @@ private:
DISALLOW_COPY_AND_ASSIGN(QueryLSIsValidMemberFunctor);
};
int ObGarbageCollector::QueryLSIsValidMemberFunctor::remove_self_from_learnerlist_(const ObAddr &leader, ObLS *ls)
{
int ret = OB_SUCCESS;
const ObLSID &ls_id = ls->get_ls_id();
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
LogGetPalfStatReq get_palf_stat_req(self_addr_, ls_id.id(), true/*is_to_leader*/);
LogGetPalfStatResp get_palf_stat_resp;
if (OB_FAIL(log_rpc_proxy_->to(leader)
.by(MTL_ID())
.timeout(TIMEOUT_US)
.max_process_handler_time(TIMEOUT_US)
.get_palf_stat(get_palf_stat_req, get_palf_stat_resp))) {
CLOG_LOG(WARN, "get_palf_stat failed", K(ls_id), K(leader), K(get_palf_stat_req));
} else {
const common::GlobalLearnerList &learner_list = get_palf_stat_resp.palf_stat_.learner_list_;
ObMember member;
if (OB_FAIL(learner_list.get_learner_by_addr(self_addr_, member))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
CLOG_LOG(INFO, "self is not in learnerlist", KPC(this), K(leader), K(learner_list), K(ls_id));
} else {
CLOG_LOG(WARN, "failed to get_learner_by_addr", KPC(this), K(leader), K(learner_list), K(ls_id));
}
} else if (OB_FAIL(ls->remove_learner(member, TIMEOUT_US))) {
CLOG_LOG(WARN, "failed to remove_learner", KPC(this), K(leader), K(learner_list), K(ls_id), K(member));
} else {
CLOG_LOG(INFO, "learner is removed from leader", KPC(this), K(leader), K(learner_list), K(ls_id), K(member));
}
}
return ret;
}
int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_ls_array_(const ObAddr &leader,
const ObLSArray &ls_array)
{
@ -161,11 +197,13 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_rpc_response_(const
const ObLSArray &ls_array = response.ls_array_;
const common::ObSEArray<bool, 16> &candidates_status = response.candidates_status_;
const common::ObSEArray<int, 16> &ret_array = response.ret_array_;
const common::ObSEArray<obrpc::LogMemberGCStat, 16> &gc_stat_array = response.gc_stat_array_;
if (ls_array.count() != candidates_status.count()
|| ls_array.count() != ret_array.count()) {
|| ls_array.count() != ret_array.count()
|| ((gc_stat_array.count() > 0) && (gc_stat_array.count() != ls_array.count()))) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "response count not match, unexpected", K(ret), K(leader));
CLOG_LOG(ERROR, "response count not match, unexpected", K(ret), K(leader), K(response));
} else {
for (int64_t index = 0; OB_SUCC(ret) && index < ls_array.count(); index++) {
ObLSHandle handle;
@ -173,6 +211,8 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_rpc_response_(const
ObGCHandler *gc_handler = NULL;
const ObLSID &id = ls_array[index];
const bool is_valid_member = candidates_status[index];
const obrpc::LogMemberGCStat member_gc_stat = gc_stat_array.count() > 0 ?
gc_stat_array[index] : obrpc::LogMemberGCStat::LOG_MEMBER_NORMAL_GC_STAT;
bool need_gc = false;
if (OB_SUCCESS != ret_array[index]) {
CLOG_LOG(INFO, "remote_ret_code is not success, need renew location", K(id), K(leader),
@ -200,7 +240,6 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_rpc_response_(const
candidate.ls_id_ = id;
candidate.ls_status_ = LSStatus::LS_NEED_GC;
candidate.gc_reason_ = NOT_IN_LEADER_MEMBER_LIST;
if (OB_FAIL(gc_candidates_.push_back(candidate))) {
CLOG_LOG(WARN, "gc_candidates push_back failed", K(ret), K(id), K(leader));
} else {
@ -210,7 +249,16 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_rpc_response_(const
CLOG_LOG(INFO, "gc_check_invalid_member_seq set seq", K(tmp_ret), K(id), K(leader), K(gc_seq_), K(need_gc));
}
} else {
CLOG_LOG(INFO, "GC check ls in member list, skip it", K(id), K(leader));
//is valid member, check member_gc_stat
if (obrpc::LogMemberGCStat::LOG_MEMBER_NORMAL_GC_STAT == member_gc_stat) {
CLOG_LOG(INFO, "GC check ls in member list, skip it", K(id), K(leader));
} else if (obrpc::LogMemberGCStat::LOG_LEARNER_IN_MIGRATING == member_gc_stat) {
if (OB_SUCCESS != (tmp_ret = remove_self_from_learnerlist_(leader, ls))) {
CLOG_LOG(WARN, "failed to remove self from learnerlist", K(tmp_ret), K(id), K(leader));
}
} else {
CLOG_LOG(ERROR, "invalid member_gc_stat,", K(id), K(leader), K(member_gc_stat));
}
}
}
}
@ -1010,6 +1058,7 @@ ObGarbageCollector::ObGarbageCollector() : is_inited_(false),
ls_service_(NULL),
rpc_proxy_(NULL),
sql_proxy_(NULL),
log_rpc_proxy_(NULL),
self_addr_(),
seq_(1),
safe_destroy_handler_(),
@ -1024,16 +1073,25 @@ ObGarbageCollector::~ObGarbageCollector()
int ObGarbageCollector::mtl_init(ObGarbageCollector* &gc_service)
{
int ret = OB_SUCCESS;
ObLSService *ls_service = MTL(ObLSService*);
ObLogService *log_service = MTL(ObLogService*);
obrpc::ObSrvRpcProxy *rpc_proxy = GCTX.srv_rpc_proxy_;
common::ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
const common::ObAddr self_addr = GCTX.self_addr();
return gc_service->init(ls_service, rpc_proxy, sql_proxy, self_addr);
if (OB_ISNULL(log_service)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "ObLogService is NULL");
} else{
ret = gc_service->init(ls_service, rpc_proxy, sql_proxy,log_service->get_rpc_proxy(), self_addr);
}
return ret;
}
int ObGarbageCollector::init(ObLSService *ls_service,
obrpc::ObSrvRpcProxy *rpc_proxy,
common::ObMySQLProxy *sql_proxy,
obrpc::ObLogServiceRpcProxy *log_rpc_proxy,
const common::ObAddr &self_addr)
{
int ret = OB_SUCCESS;
@ -1041,15 +1099,17 @@ int ObGarbageCollector::init(ObLSService *ls_service,
ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObGarbageCollector is inited twice");
} else if (OB_ISNULL(ls_service) || OB_ISNULL(rpc_proxy)
|| OB_ISNULL(sql_proxy) || !self_addr.is_valid()) {
|| OB_ISNULL(sql_proxy) || OB_ISNULL(log_rpc_proxy)|| !self_addr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", K(ret), KP(ls_service), KP(rpc_proxy), KP(sql_proxy), K(self_addr));
CLOG_LOG(WARN, "invalid arguments", K(ret), KP(ls_service), KP(rpc_proxy), KP(sql_proxy),
KP(log_rpc_proxy), K(self_addr));
} else if (OB_FAIL(safe_destroy_handler_.init())) {
CLOG_LOG(WARN, "safe destroy handler init failed", K(ret));
} else {
ls_service_ = ls_service;
rpc_proxy_ = rpc_proxy;
sql_proxy_ = sql_proxy;
log_rpc_proxy_ = log_rpc_proxy;
self_addr_ = self_addr;
seq_ = 1;
is_inited_ = true;
@ -1104,6 +1164,7 @@ void ObGarbageCollector::destroy()
ls_service_ = NULL;
rpc_proxy_ = NULL;
sql_proxy_ = NULL;
log_rpc_proxy_ = NULL;
self_addr_.reset();
safe_destroy_handler_.destroy();
}
@ -1297,7 +1358,7 @@ int ObGarbageCollector::handle_each_ls_for_member_list_(ServerLSMap &server_ls_m
ObGCCandidateArray &gc_candidates)
{
int ret = OB_SUCCESS;
QueryLSIsValidMemberFunctor functor(rpc_proxy_, ls_service_, self_addr_, seq_, gc_candidates);
QueryLSIsValidMemberFunctor functor(rpc_proxy_, log_rpc_proxy_, ls_service_, self_addr_, seq_, gc_candidates);
if (OB_SUCCESS != server_ls_map.for_each(functor)) {
ret = functor.get_ret_value();
CLOG_LOG(WARN, "handle_each_ls_for_member_list_ failed", K(ret));