notify rs when ls restore finish/open archive mode/close archive mode
This commit is contained in:
parent
f0a728b62a
commit
fe111d136b
@ -1005,6 +1005,8 @@ PCODE_DEF(OB_NOTIFY_ARCHIVE, 0x142A)
|
||||
PCODE_DEF(OB_CHANGE_EXTERNAL_STORAGE_DEST, 0x142B)
|
||||
PCODE_DEF(OB_UPDATE_TENANT_QUICK_RESTORE, 0x142C)
|
||||
PCODE_DEF(OB_BACKUP_FUSE_TABLET_META, 0x142D)
|
||||
PCODE_DEF(OB_NOTIFY_LS_RESTORE_FINISH, 0x142E)
|
||||
PCODE_DEF(OB_NOTIFY_START_ARCHIVE, 0x142F)
|
||||
// backup and restore end 0x14ff
|
||||
|
||||
// logservice
|
||||
|
@ -97,6 +97,8 @@
|
||||
#include "close_modules/shared_storage/storage/shared_storage/ob_ss_micro_cache_io_helper.h"
|
||||
#endif
|
||||
#include "share/object_storage/ob_device_config_mgr.h"
|
||||
#include "rootserver/restore/ob_restore_service.h"
|
||||
#include "rootserver/backup/ob_archive_scheduler_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -4061,5 +4063,45 @@ int ObNotifySharedStorageInfoP::process()
|
||||
result_.set_ret(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcNotifyLSRestoreFinishP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!arg_.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg_));
|
||||
} else {
|
||||
MTL_SWITCH(gen_meta_tenant_id(arg_.get_tenant_id())) {
|
||||
rootserver::ObRestoreService* restore_service = MTL(rootserver::ObRestoreService*);
|
||||
if (OB_ISNULL(restore_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("restore service is null", KR(ret), K(arg_));
|
||||
} else {
|
||||
restore_service->wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcStartArchiveP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!arg_.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(arg_));
|
||||
} else {
|
||||
MTL_SWITCH(gen_meta_tenant_id(arg_.get_tenant_id())) {
|
||||
rootserver::ObArchiveSchedulerService* archive_service = MTL(rootserver::ObArchiveSchedulerService*);
|
||||
if (OB_ISNULL(archive_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("archive service is null", KR(ret), K(arg_));
|
||||
} else {
|
||||
archive_service->wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // end of namespace observer
|
||||
} // end of namespace oceanbase
|
||||
|
@ -317,6 +317,8 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_FETCH_STABLE_MEMBER_LIST, ObFetchStableMemberListP
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CHANGE_EXTERNAL_STORAGE_DEST, ObRpcChangeExternalStorageDestP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_QUERY_CLIENT_SESSION, ObKillQueryClientSessionP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_SHARED_STORAGE_INFO, ObNotifySharedStorageInfoP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_LS_RESTORE_FINISH, ObRpcNotifyLSRestoreFinishP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_START_ARCHIVE, ObRpcStartArchiveP);
|
||||
} // end of namespace observer
|
||||
} // end of namespace oceanbase
|
||||
|
||||
|
@ -168,4 +168,6 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) {
|
||||
RPC_PROCESSOR(ObSetSSCkptCompressorP, gctx_);
|
||||
#endif
|
||||
RPC_PROCESSOR(ObNotifySharedStorageInfoP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcNotifyLSRestoreFinishP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcStartArchiveP, gctx_);
|
||||
}
|
||||
|
@ -437,6 +437,8 @@ int ObArchiveSchedulerService::open_archive_mode(const uint64_t tenant_id, const
|
||||
} else {
|
||||
if (OB_FAIL(open_tenant_archive_mode_(archive_tenant_ids))) {
|
||||
LOG_WARN("failed to open archive mode for indicated tenants", K(ret), K(archive_tenant_ids));
|
||||
} else if (1 == archive_tenant_ids.count()) {
|
||||
notify_start_archive_(archive_tenant_ids.at(0));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -446,6 +448,8 @@ int ObArchiveSchedulerService::open_archive_mode(const uint64_t tenant_id, const
|
||||
LOG_WARN("normal tenant can only open archive mode for itself.", K(ret), K(tenant_id), K(archive_tenant_ids));
|
||||
} else if (OB_FAIL(open_tenant_archive_mode_(tenant_id))) {
|
||||
LOG_WARN("failed to open archive mode", K(ret), K(tenant_id));
|
||||
} else {
|
||||
notify_start_archive_(tenant_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -522,6 +526,8 @@ int ObArchiveSchedulerService::close_archive_mode(
|
||||
} else {
|
||||
if (OB_FAIL(close_tenant_archive_mode_(archive_tenant_ids))) {
|
||||
LOG_WARN("failed to close archive mode for indicated tenants", K(ret), K(archive_tenant_ids));
|
||||
} else if (1 == archive_tenant_ids.count()) {
|
||||
notify_start_archive_(archive_tenant_ids.at(0));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -531,6 +537,8 @@ int ObArchiveSchedulerService::close_archive_mode(
|
||||
LOG_WARN("normal tenant can only close archive mode for itself.", K(ret), K(tenant_id), K(archive_tenant_ids));
|
||||
} else if (OB_FAIL(close_tenant_archive_mode_(tenant_id))) {
|
||||
LOG_WARN("failed to close archive mode", K(ret), K(tenant_id));
|
||||
} else {
|
||||
notify_start_archive_(tenant_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -561,3 +569,21 @@ int ObArchiveSchedulerService::close_tenant_archive_mode_(const uint64_t tenant_
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObArchiveSchedulerService::notify_start_archive_(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObAddr leader_addr;
|
||||
obrpc::ObNotifyStartArchiveArg arg;
|
||||
arg.set_tenant_id(tenant_id);
|
||||
|
||||
if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc proxy or location service is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_));
|
||||
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
|
||||
GCONF.cluster_id, gen_meta_tenant_id(tenant_id), ObLSID(ObLSID::SYS_LS_ID), leader_addr))) {
|
||||
LOG_WARN("failed to get meta tenant leader address", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_start_archive(arg))) {
|
||||
LOG_WARN("failed to notify tenant archive scheduler service", KR(ret), K(leader_addr), K(tenant_id), K(arg));
|
||||
}
|
||||
}
|
@ -80,6 +80,8 @@ private:
|
||||
int open_tenant_archive_mode_(const uint64_t tenant_id);
|
||||
int close_tenant_archive_mode_(const common::ObIArray<uint64_t> &tenant_ids_array);
|
||||
int close_tenant_archive_mode_(const uint64_t tenant_id);
|
||||
// notify arhicve start/end to tenant's rs
|
||||
void notify_start_archive_(const uint64_t tenant_id);
|
||||
|
||||
bool is_inited_;
|
||||
uint64_t tenant_id_;
|
||||
|
@ -13421,5 +13421,38 @@ int ObNotifySharedStorageInfoResult::get_ret() const
|
||||
{
|
||||
return ret_;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObNotifyLSRestoreFinishArg, tenant_id_, ls_id_);
|
||||
bool ObNotifyLSRestoreFinishArg::is_valid() const
|
||||
{
|
||||
return is_user_tenant(tenant_id_) && ls_id_.is_valid();
|
||||
}
|
||||
|
||||
int ObNotifyLSRestoreFinishArg::assign(const ObNotifyLSRestoreFinishArg &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this == &other) {
|
||||
} else {
|
||||
tenant_id_ = other.tenant_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObNotifyStartArchiveArg, tenant_id_);
|
||||
bool ObNotifyStartArchiveArg::is_valid() const
|
||||
{
|
||||
return is_user_tenant(tenant_id_);
|
||||
}
|
||||
|
||||
int ObNotifyStartArchiveArg::assign(const ObNotifyStartArchiveArg &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (this == &other) {
|
||||
} else {
|
||||
tenant_id_ = other.tenant_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}//end namespace obrpc
|
||||
}//end namespace oceanbase
|
||||
|
@ -13591,6 +13591,47 @@ public:
|
||||
private:
|
||||
int ret_;
|
||||
};
|
||||
|
||||
struct ObNotifyLSRestoreFinishArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObNotifyLSRestoreFinishArg()
|
||||
: tenant_id_(common::OB_INVALID_TENANT_ID),
|
||||
ls_id_(share::ObLSID::INVALID_LS_ID) {}
|
||||
~ObNotifyLSRestoreFinishArg() {}
|
||||
bool is_valid() const;
|
||||
int assign(const ObNotifyLSRestoreFinishArg &other);
|
||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||
share::ObLSID get_ls_id() const { return ls_id_; }
|
||||
void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
||||
void set_ls_id(const share::ObLSID &ls_id) { ls_id_ = ls_id; }
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id));
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObNotifyLSRestoreFinishArg);
|
||||
private:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
};
|
||||
|
||||
struct ObNotifyStartArchiveArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObNotifyStartArchiveArg()
|
||||
: tenant_id_(common::OB_INVALID_TENANT_ID)
|
||||
{}
|
||||
~ObNotifyStartArchiveArg() {}
|
||||
bool is_valid() const;
|
||||
int assign(const ObNotifyStartArchiveArg &other);
|
||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||
void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
||||
TO_STRING_KV(K_(tenant_id));
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObNotifyStartArchiveArg);
|
||||
private:
|
||||
uint64_t tenant_id_;
|
||||
};
|
||||
}//end namespace obrpc
|
||||
}//end namespace oceanbase
|
||||
#endif
|
||||
|
@ -303,6 +303,8 @@ public:
|
||||
RPC_AP(PR5 collect_mv_merge_info, OB_COLLECT_MV_MERGE_INFO, (obrpc::ObCollectMvMergeInfoArg), obrpc::ObCollectMvMergeInfoResult);
|
||||
RPC_S(PR5 fetch_stable_member_list, OB_FETCH_STABLE_MEMBER_LIST, (obrpc::ObFetchStableMemberListArg), obrpc::ObFetchStableMemberListInfo);
|
||||
RPC_S(PR5 notify_shared_storage_info, OB_NOTIFY_SHARED_STORAGE_INFO, (obrpc::ObNotifySharedStorageInfoArg), obrpc::ObNotifySharedStorageInfoResult);
|
||||
RPC_S(PR5 notify_ls_restore_finish, OB_NOTIFY_LS_RESTORE_FINISH, (obrpc::ObNotifyLSRestoreFinishArg));
|
||||
RPC_S(PR5 notify_start_archive, OB_NOTIFY_START_ARCHIVE, (obrpc::ObNotifyStartArchiveArg));
|
||||
}; // end of class ObSrvRpcProxy
|
||||
|
||||
} // end of namespace rpc
|
||||
|
@ -874,9 +874,21 @@ int ObILSRestoreState::advance_status_(
|
||||
if (OB_SUCCESS != (tmp_ret = report_ls_restore_progress_(ls, next_status, *ObCurTraceId::get_trace_id()))) {
|
||||
LOG_WARN("fail to reprot ls restore progress", K(tmp_ret), K(ls), K(next_status));
|
||||
}
|
||||
|
||||
if (need_notify_rs_restore_finish_(next_status)) {
|
||||
notify_rs_restore_finish_();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObILSRestoreState::need_notify_rs_restore_finish_(const ObLSRestoreStatus &ls_restore_status)
|
||||
{
|
||||
return ObLSRestoreStatus::WAIT_RESTORE_TO_CONSISTENT_SCN == ls_restore_status
|
||||
|| ObLSRestoreStatus::QUICK_RESTORE_FINISH == ls_restore_status
|
||||
|| ObLSRestoreStatus::NONE == ls_restore_status
|
||||
|| ObLSRestoreStatus::RESTORE_FAILED == ls_restore_status;
|
||||
}
|
||||
|
||||
int ObILSRestoreState::report_ls_restore_progress_(
|
||||
storage::ObLS &ls, const share::ObLSRestoreStatus &status,
|
||||
const share::ObTaskId &trace_id, const int result, const char *comment)
|
||||
@ -1578,6 +1590,26 @@ int ObILSRestoreState::report_unfinished_bytes(const int64_t bytes)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObILSRestoreState::notify_rs_restore_finish_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = ls_restore_arg_->tenant_id_;
|
||||
common::ObAddr leader_addr;
|
||||
obrpc::ObNotifyLSRestoreFinishArg arg;
|
||||
arg.set_tenant_id(tenant_id);
|
||||
arg.set_ls_id(ls_->get_ls_id());
|
||||
|
||||
if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc proxy or location service is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_));
|
||||
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
|
||||
GCONF.cluster_id, gen_meta_tenant_id(tenant_id), ObLSID(ObLSID::SYS_LS_ID), leader_addr))) {
|
||||
LOG_WARN("failed to get meta tenant leader address", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_ls_restore_finish(arg))) {
|
||||
LOG_WARN("failed to notify tenant restore scheduler", KR(ret), K(leader_addr), K(arg));
|
||||
}
|
||||
}
|
||||
|
||||
//================================ObLSRestoreStartState=======================================
|
||||
ObLSRestoreStartState::ObLSRestoreStartState()
|
||||
: ObILSRestoreState(ObLSRestoreStatus::Status::RESTORE_START)
|
||||
|
@ -264,6 +264,9 @@ protected:
|
||||
int check_replay_to_target_scn_(
|
||||
const share::SCN &target_scn,
|
||||
bool &replayed) const;
|
||||
bool need_notify_rs_restore_finish_(const ObLSRestoreStatus &ls_restore_status);
|
||||
|
||||
void notify_rs_restore_finish_();
|
||||
|
||||
protected:
|
||||
bool is_inited_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user