diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index ecd9a5521..6baa93f4a 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -1005,6 +1005,8 @@ PCODE_DEF(OB_NOTIFY_ARCHIVE, 0x142A) PCODE_DEF(OB_CHANGE_EXTERNAL_STORAGE_DEST, 0x142B) PCODE_DEF(OB_UPDATE_TENANT_QUICK_RESTORE, 0x142C) PCODE_DEF(OB_BACKUP_FUSE_TABLET_META, 0x142D) +PCODE_DEF(OB_NOTIFY_LS_RESTORE_FINISH, 0x142E) +PCODE_DEF(OB_NOTIFY_START_ARCHIVE, 0x142F) // backup and restore end 0x14ff // logservice diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 712533552..0c0822cf0 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -97,6 +97,8 @@ #include "close_modules/shared_storage/storage/shared_storage/ob_ss_micro_cache_io_helper.h" #endif #include "share/object_storage/ob_device_config_mgr.h" +#include "rootserver/restore/ob_restore_service.h" +#include "rootserver/backup/ob_archive_scheduler_service.h" namespace oceanbase { @@ -4061,5 +4063,45 @@ int ObNotifySharedStorageInfoP::process() result_.set_ret(ret); return ret; } + +int ObRpcNotifyLSRestoreFinishP::process() +{ + int ret = OB_SUCCESS; + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg_)); + } else { + MTL_SWITCH(gen_meta_tenant_id(arg_.get_tenant_id())) { + rootserver::ObRestoreService* restore_service = MTL(rootserver::ObRestoreService*); + if (OB_ISNULL(restore_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("restore service is null", KR(ret), K(arg_)); + } else { + restore_service->wakeup(); + } + } + } + return ret; +} + +int ObRpcStartArchiveP::process() +{ + int ret = OB_SUCCESS; + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg_)); + } else { + MTL_SWITCH(gen_meta_tenant_id(arg_.get_tenant_id())) { + rootserver::ObArchiveSchedulerService* archive_service = MTL(rootserver::ObArchiveSchedulerService*); + if (OB_ISNULL(archive_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("archive service is null", KR(ret), K(arg_)); + } else { + archive_service->wakeup(); + } + } + } + return ret; +} } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 8ebde1cb3..65121d5cc 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -317,6 +317,8 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_FETCH_STABLE_MEMBER_LIST, ObFetchStableMemberListP OB_DEFINE_PROCESSOR_S(Srv, OB_CHANGE_EXTERNAL_STORAGE_DEST, ObRpcChangeExternalStorageDestP); OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_QUERY_CLIENT_SESSION, ObKillQueryClientSessionP); OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_SHARED_STORAGE_INFO, ObNotifySharedStorageInfoP); +OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_LS_RESTORE_FINISH, ObRpcNotifyLSRestoreFinishP); +OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_START_ARCHIVE, ObRpcStartArchiveP); } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index 8f2c8a244..74fb0f22e 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -168,4 +168,6 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObSetSSCkptCompressorP, gctx_); #endif RPC_PROCESSOR(ObNotifySharedStorageInfoP, gctx_); + RPC_PROCESSOR(ObRpcNotifyLSRestoreFinishP, gctx_); + RPC_PROCESSOR(ObRpcStartArchiveP, gctx_); } diff --git a/src/rootserver/backup/ob_archive_scheduler_service.cpp b/src/rootserver/backup/ob_archive_scheduler_service.cpp index 1f2ec1c5f..c1b7e574d 100644 --- a/src/rootserver/backup/ob_archive_scheduler_service.cpp +++ b/src/rootserver/backup/ob_archive_scheduler_service.cpp @@ -437,6 +437,8 @@ int ObArchiveSchedulerService::open_archive_mode(const uint64_t tenant_id, const } else { if (OB_FAIL(open_tenant_archive_mode_(archive_tenant_ids))) { LOG_WARN("failed to open archive mode for indicated tenants", K(ret), K(archive_tenant_ids)); + } else if (1 == archive_tenant_ids.count()) { + notify_start_archive_(archive_tenant_ids.at(0)); } } } else { @@ -446,6 +448,8 @@ int ObArchiveSchedulerService::open_archive_mode(const uint64_t tenant_id, const LOG_WARN("normal tenant can only open archive mode for itself.", K(ret), K(tenant_id), K(archive_tenant_ids)); } else if (OB_FAIL(open_tenant_archive_mode_(tenant_id))) { LOG_WARN("failed to open archive mode", K(ret), K(tenant_id)); + } else { + notify_start_archive_(tenant_id); } } @@ -522,6 +526,8 @@ int ObArchiveSchedulerService::close_archive_mode( } else { if (OB_FAIL(close_tenant_archive_mode_(archive_tenant_ids))) { LOG_WARN("failed to close archive mode for indicated tenants", K(ret), K(archive_tenant_ids)); + } else if (1 == archive_tenant_ids.count()) { + notify_start_archive_(archive_tenant_ids.at(0)); } } } else { @@ -531,6 +537,8 @@ int ObArchiveSchedulerService::close_archive_mode( LOG_WARN("normal tenant can only close archive mode for itself.", K(ret), K(tenant_id), K(archive_tenant_ids)); } else if (OB_FAIL(close_tenant_archive_mode_(tenant_id))) { LOG_WARN("failed to close archive mode", K(ret), K(tenant_id)); + } else { + notify_start_archive_(tenant_id); } } @@ -561,3 +569,21 @@ int ObArchiveSchedulerService::close_tenant_archive_mode_(const uint64_t tenant_ } return ret; } + +void ObArchiveSchedulerService::notify_start_archive_(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + common::ObAddr leader_addr; + obrpc::ObNotifyStartArchiveArg arg; + arg.set_tenant_id(tenant_id); + + if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc proxy or location service is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_)); + } else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout( + GCONF.cluster_id, gen_meta_tenant_id(tenant_id), ObLSID(ObLSID::SYS_LS_ID), leader_addr))) { + LOG_WARN("failed to get meta tenant leader address", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_start_archive(arg))) { + LOG_WARN("failed to notify tenant archive scheduler service", KR(ret), K(leader_addr), K(tenant_id), K(arg)); + } +} \ No newline at end of file diff --git a/src/rootserver/backup/ob_archive_scheduler_service.h b/src/rootserver/backup/ob_archive_scheduler_service.h index 9e01bc23b..8ccebcc34 100644 --- a/src/rootserver/backup/ob_archive_scheduler_service.h +++ b/src/rootserver/backup/ob_archive_scheduler_service.h @@ -80,6 +80,8 @@ private: int open_tenant_archive_mode_(const uint64_t tenant_id); int close_tenant_archive_mode_(const common::ObIArray &tenant_ids_array); int close_tenant_archive_mode_(const uint64_t tenant_id); + // notify arhicve start/end to tenant's rs + void notify_start_archive_(const uint64_t tenant_id); bool is_inited_; uint64_t tenant_id_; diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 25fc6c360..3052c8d99 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -13421,5 +13421,38 @@ int ObNotifySharedStorageInfoResult::get_ret() const { return ret_; } + +OB_SERIALIZE_MEMBER(ObNotifyLSRestoreFinishArg, tenant_id_, ls_id_); +bool ObNotifyLSRestoreFinishArg::is_valid() const +{ + return is_user_tenant(tenant_id_) && ls_id_.is_valid(); +} + +int ObNotifyLSRestoreFinishArg::assign(const ObNotifyLSRestoreFinishArg &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else { + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObNotifyStartArchiveArg, tenant_id_); +bool ObNotifyStartArchiveArg::is_valid() const +{ + return is_user_tenant(tenant_id_); +} + +int ObNotifyStartArchiveArg::assign(const ObNotifyStartArchiveArg &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else { + tenant_id_ = other.tenant_id_; + } + return ret; +} }//end namespace obrpc }//end namespace oceanbase diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 61e4ad296..964fbad03 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -13591,6 +13591,47 @@ public: private: int ret_; }; + +struct ObNotifyLSRestoreFinishArg final +{ + OB_UNIS_VERSION(1); +public: + ObNotifyLSRestoreFinishArg() + : tenant_id_(common::OB_INVALID_TENANT_ID), + ls_id_(share::ObLSID::INVALID_LS_ID) {} + ~ObNotifyLSRestoreFinishArg() {} + bool is_valid() const; + int assign(const ObNotifyLSRestoreFinishArg &other); + uint64_t get_tenant_id() const { return tenant_id_; } + share::ObLSID get_ls_id() const { return ls_id_; } + void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; } + void set_ls_id(const share::ObLSID &ls_id) { ls_id_ = ls_id; } + TO_STRING_KV(K_(tenant_id), K_(ls_id)); +private: + DISALLOW_COPY_AND_ASSIGN(ObNotifyLSRestoreFinishArg); +private: + uint64_t tenant_id_; + share::ObLSID ls_id_; +}; + +struct ObNotifyStartArchiveArg final +{ + OB_UNIS_VERSION(1); +public: + ObNotifyStartArchiveArg() + : tenant_id_(common::OB_INVALID_TENANT_ID) + {} + ~ObNotifyStartArchiveArg() {} + bool is_valid() const; + int assign(const ObNotifyStartArchiveArg &other); + uint64_t get_tenant_id() const { return tenant_id_; } + void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; } + TO_STRING_KV(K_(tenant_id)); +private: + DISALLOW_COPY_AND_ASSIGN(ObNotifyStartArchiveArg); +private: + uint64_t tenant_id_; +}; }//end namespace obrpc }//end namespace oceanbase #endif diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 4c10f497a..8db04ff1e 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -303,6 +303,8 @@ public: RPC_AP(PR5 collect_mv_merge_info, OB_COLLECT_MV_MERGE_INFO, (obrpc::ObCollectMvMergeInfoArg), obrpc::ObCollectMvMergeInfoResult); RPC_S(PR5 fetch_stable_member_list, OB_FETCH_STABLE_MEMBER_LIST, (obrpc::ObFetchStableMemberListArg), obrpc::ObFetchStableMemberListInfo); RPC_S(PR5 notify_shared_storage_info, OB_NOTIFY_SHARED_STORAGE_INFO, (obrpc::ObNotifySharedStorageInfoArg), obrpc::ObNotifySharedStorageInfoResult); + RPC_S(PR5 notify_ls_restore_finish, OB_NOTIFY_LS_RESTORE_FINISH, (obrpc::ObNotifyLSRestoreFinishArg)); + RPC_S(PR5 notify_start_archive, OB_NOTIFY_START_ARCHIVE, (obrpc::ObNotifyStartArchiveArg)); }; // end of class ObSrvRpcProxy } // end of namespace rpc diff --git a/src/storage/restore/ob_ls_restore_handler.cpp b/src/storage/restore/ob_ls_restore_handler.cpp index 84d1ca557..f68cb1123 100644 --- a/src/storage/restore/ob_ls_restore_handler.cpp +++ b/src/storage/restore/ob_ls_restore_handler.cpp @@ -874,9 +874,21 @@ int ObILSRestoreState::advance_status_( if (OB_SUCCESS != (tmp_ret = report_ls_restore_progress_(ls, next_status, *ObCurTraceId::get_trace_id()))) { LOG_WARN("fail to reprot ls restore progress", K(tmp_ret), K(ls), K(next_status)); } + + if (need_notify_rs_restore_finish_(next_status)) { + notify_rs_restore_finish_(); + } return ret; } +bool ObILSRestoreState::need_notify_rs_restore_finish_(const ObLSRestoreStatus &ls_restore_status) +{ + return ObLSRestoreStatus::WAIT_RESTORE_TO_CONSISTENT_SCN == ls_restore_status + || ObLSRestoreStatus::QUICK_RESTORE_FINISH == ls_restore_status + || ObLSRestoreStatus::NONE == ls_restore_status + || ObLSRestoreStatus::RESTORE_FAILED == ls_restore_status; +} + int ObILSRestoreState::report_ls_restore_progress_( storage::ObLS &ls, const share::ObLSRestoreStatus &status, const share::ObTaskId &trace_id, const int result, const char *comment) @@ -1578,6 +1590,26 @@ int ObILSRestoreState::report_unfinished_bytes(const int64_t bytes) return ret; } +void ObILSRestoreState::notify_rs_restore_finish_() +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = ls_restore_arg_->tenant_id_; + common::ObAddr leader_addr; + obrpc::ObNotifyLSRestoreFinishArg arg; + arg.set_tenant_id(tenant_id); + arg.set_ls_id(ls_->get_ls_id()); + + if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc proxy or location service is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(GCTX.location_service_)); + } else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout( + GCONF.cluster_id, gen_meta_tenant_id(tenant_id), ObLSID(ObLSID::SYS_LS_ID), leader_addr))) { + LOG_WARN("failed to get meta tenant leader address", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id).notify_ls_restore_finish(arg))) { + LOG_WARN("failed to notify tenant restore scheduler", KR(ret), K(leader_addr), K(arg)); + } +} + //================================ObLSRestoreStartState======================================= ObLSRestoreStartState::ObLSRestoreStartState() : ObILSRestoreState(ObLSRestoreStatus::Status::RESTORE_START) diff --git a/src/storage/restore/ob_ls_restore_handler.h b/src/storage/restore/ob_ls_restore_handler.h index f7ec3564c..efe511a68 100644 --- a/src/storage/restore/ob_ls_restore_handler.h +++ b/src/storage/restore/ob_ls_restore_handler.h @@ -264,6 +264,9 @@ protected: int check_replay_to_target_scn_( const share::SCN &target_scn, bool &replayed) const; + bool need_notify_rs_restore_finish_(const ObLSRestoreStatus &ls_restore_status); + + void notify_rs_restore_finish_(); protected: bool is_inited_;