diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 5e22fce37..09b76da9e 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -908,6 +908,7 @@ PCODE_DEF(OB_DELETE_POLICY, 0x1426) // 0x1427 for OB_RECOVER_TABLE // 0x1428 for OB_BACKUP_ADVANCE_CHECKPOINT // 0x1429 for OB_RECOVER_RESTORE_TABLE_DDL +PCODE_DEF(OB_NOTIFY_ARCHIVE, 0x142A) // backup and restore end 0x14ff // logservice diff --git a/src/logservice/archiveservice/ob_archive_service.cpp b/src/logservice/archiveservice/ob_archive_service.cpp index 38dc71be0..d05c600a3 100644 --- a/src/logservice/archiveservice/ob_archive_service.cpp +++ b/src/logservice/archiveservice/ob_archive_service.cpp @@ -196,12 +196,7 @@ int ObArchiveService::get_ls_archive_speed(const ObLSID &id, return persist_mgr_.get_ls_archive_speed(id, speed, force_wait, ignore); } -void ObArchiveService::process_start_archive() -{ - cond_.signal(); -} - -void ObArchiveService::process_stop_archive() +void ObArchiveService::wakeup() { cond_.signal(); } diff --git a/src/logservice/archiveservice/ob_archive_service.h b/src/logservice/archiveservice/ob_archive_service.h index b1764fc3f..2d539618c 100644 --- a/src/logservice/archiveservice/ob_archive_service.h +++ b/src/logservice/archiveservice/ob_archive_service.h @@ -105,9 +105,7 @@ public: int get_ls_archive_speed(const ObLSID &id, int64_t &speed, bool &force_wait, bool &ignore); ///////////// RPC process functions ///////////////// - void process_start_archive(); - - void process_stop_archive(); + void wakeup(); int iterate_ls(const std::function &func); diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 3355fda6e..9b3ba9e38 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -618,6 +618,18 @@ int ObRpcBackupLSCleanP::process() return ret; } +int ObRpcNotifyArchiveP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(gctx_.ob_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid argument", KP(gctx_.ob_service_), K(ret)); + } else { + ret = gctx_.ob_service_->notify_archive(arg_); + } + return ret; +} + int ObRpcBackupLSDataP::process() { int ret = OB_SUCCESS; diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 65630600e..b788c7812 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -155,6 +155,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_BACKUP_META, ObRpcBackupMetaP); OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_CREATE_DUPLICATE_LS, ObRpcCreateDuplicateLSP); OB_DEFINE_PROCESSOR_S(Srv, OB_DELETE_BACKUP_LS_TASK_RES, ObRpcBackupCleanLSResP); OB_DEFINE_PROCESSOR_S(Srv, OB_BACKUP_LS_DATA_RES, ObRpcBackupLSDataResP); +OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_ARCHIVE, ObRpcNotifyArchiveP); OB_DEFINE_PROCESSOR_S(Srv, OB_LS_MIGRATE_REPLICA, ObRpcLSMigrateReplicaP); OB_DEFINE_PROCESSOR_S(Srv, OB_LS_ADD_REPLICA, ObRpcLSAddReplicaP); diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index bbeea06b5..57711dca4 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -63,6 +63,7 @@ #include "storage/ls/ob_ls.h" #include "logservice/ob_log_service.h" // ObLogService #include "logservice/palf_handle_guard.h" // PalfHandleGuard +#include "logservice/archiveservice/ob_archive_service.h" #include "share/scn.h" // PalfHandleGuard #include "storage/backup/ob_backup_handler.h" #include "storage/backup/ob_ls_backup_clean_mgr.h" @@ -92,6 +93,7 @@ using namespace share::schema; using namespace storage; using namespace backup; using namespace palf; +using namespace archive; namespace observer { @@ -871,6 +873,32 @@ int ObService::delete_backup_ls_task(const obrpc::ObLSBackupCleanArg &arg) return ret; } +int ObService::notify_archive(const obrpc::ObNotifyArchiveArg &arg) +{ + int ret = OB_SUCCESS; + LOG_INFO("recieve notify archive request", K(arg)); + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("ObService not init", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(arg)); + } else { + MTL_SWITCH(arg.tenant_id_){ + archive::ObArchiveService *archive_service = MTL(ObArchiveService*); + if (OB_ISNULL(archive_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null MTL scheduler", K(ret), KP(archive_service)); + } else { + archive_service->wakeup(); + LOG_INFO("succeed to notify archive service", K(arg)); + } + } + } + + return ret; +} + int ObService::check_sys_task_exist( const share::ObTaskId &arg, bool &res) { diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 3a28c5cad..c1d3025b0 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -146,6 +146,7 @@ public: int check_sys_task_exist(const share::ObTaskId &arg, bool &res); int check_migrate_task_exist(const share::ObTaskId &arg, bool &res); int delete_backup_ls_task(const obrpc::ObLSBackupCleanArg &arg); + int notify_archive(const obrpc::ObNotifyArchiveArg &arg); int report_backup_over(const obrpc::ObBackupTaskRes &res); int report_backup_clean_over(const obrpc::ObBackupTaskRes &res); diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index faaa2569e..0a89f6b7f 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -88,6 +88,7 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObRpcBackupMetaP, gctx_); RPC_PROCESSOR(ObRpcBackupLSDataResP, gctx_); RPC_PROCESSOR(ObRpcBackupCleanLSResP, gctx_); + RPC_PROCESSOR(ObRpcNotifyArchiveP, gctx_); RPC_PROCESSOR(ObRpcCheckBackupTaskExistP, gctx_); RPC_PROCESSOR(ObRenewInZoneHbP, gctx_); diff --git a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp index 204dd6887..e99cc5070 100644 --- a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp +++ b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp @@ -24,6 +24,7 @@ #include "share/backup/ob_archive_store.h" #include "share/backup/ob_backup_connectivity.h" #include "share/ls/ob_ls_i_life_manager.h" +#include "share/ls/ob_ls_operator.h" #include "share/scn.h" #include "share/ob_debug_sync.h" @@ -547,6 +548,7 @@ int ObArchiveHandler::check_can_do_archive(bool &can) const int ObArchiveHandler::enable_archive(const int64_t dest_no) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; bool can; bool create; ObTenantArchiveRoundAttr new_round_attr; @@ -564,6 +566,11 @@ int ObArchiveHandler::enable_archive(const int64_t dest_no) } else if (OB_FAIL(round_handler_.enable_archive(dest_no, new_round_attr))) { LOG_WARN("failed to enable archive", K(ret), K_(tenant_id), K(dest_no)); } else { + if (new_round_attr.state_.status_ == ObArchiveRoundState::Status::BEGINNING) { + if (OB_TMP_FAIL(notify_(new_round_attr))) { + LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr)); + } + } LOG_INFO("enable archive", K(dest_no), K(new_round_attr)); } @@ -580,6 +587,7 @@ int ObArchiveHandler::enable_archive(const int64_t dest_no) int ObArchiveHandler::disable_archive(const int64_t dest_no) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; ObTenantArchiveRoundAttr new_round_attr; if (IS_NOT_INIT) { @@ -588,6 +596,9 @@ int ObArchiveHandler::disable_archive(const int64_t dest_no) } else if (OB_FAIL(round_handler_.disable_archive(dest_no, new_round_attr))) { LOG_WARN("failed to disable archive", K(ret), K_(tenant_id), K(dest_no)); } else { + if (OB_TMP_FAIL(notify_(new_round_attr))) { + LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr)); + } LOG_INFO("disable archive", K(dest_no), K(new_round_attr)); } @@ -604,6 +615,7 @@ int ObArchiveHandler::disable_archive(const int64_t dest_no) int ObArchiveHandler::defer_archive(const int64_t dest_no) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; ObTenantArchiveRoundAttr new_round_attr; if (IS_NOT_INIT) { @@ -612,6 +624,9 @@ int ObArchiveHandler::defer_archive(const int64_t dest_no) } else if (OB_FAIL(round_handler_.defer_archive(dest_no, new_round_attr))) { LOG_WARN("failed to defer archive", K(ret), K_(tenant_id), K(dest_no)); } else { + if (OB_TMP_FAIL(notify_(new_round_attr))) { + LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr)); + } LOG_INFO("defer archive", K(dest_no), K(new_round_attr)); } @@ -755,9 +770,40 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info int ObArchiveHandler::notify_(const ObTenantArchiveRoundAttr &round) { int ret = OB_SUCCESS; - // TODO: notify each log stream. + int tmp_ret = OB_SUCCESS; UNUSED(round); - // Get all log streams, and try the best to notify each log stream event of archive start. + share::ObLSAttrArray ls_array; + share::ObLSAttrOperator ls_operator(tenant_id_, sql_proxy_); + hash::ObHashSet notify_addr_set; + share::ObLocationService *location_service = GCTX.location_service_; + const bool force_renew = true; + common::ObAddr leader_addr; + obrpc::ObNotifyArchiveArg arg; + arg.tenant_id_ = tenant_id_; + + if (OB_FAIL(ls_operator.get_all_ls_by_order(ls_array))) { + LOG_WARN("failed to get all ls info", K(ret), K(tenant_id_)); + } else if (OB_FAIL(notify_addr_set.create(ls_array.count()))) { + LOG_WARN("failed to create notify addr set", K(ret)); + } else { + ARRAY_FOREACH_N(ls_array, i, cnt) { + const ObLSAttr &ls_attr = ls_array.at(i); + if(OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_attr.get_ls_id(), force_renew, leader_addr))) { + LOG_WARN("failed to get leader addr", K(ret), KP(location_service), "ls_id", ls_attr.get_ls_id()); + } else if(OB_FAIL(notify_addr_set.set_refactored(leader_addr))) { + LOG_WARN("failed to set server_addr in notify_addr_set", K(ret), "ls_id", ls_attr.get_ls_id(), K(leader_addr)); + } + } + LOG_INFO("leader_addr_set to be notified archive:", K(notify_addr_set)); + for (hash::ObHashSet::const_iterator it = notify_addr_set.begin(); it != notify_addr_set.end(); it++) { + if (OB_TMP_FAIL(rpc_proxy_->to(it->first).notify_archive(arg))) { + LOG_WARN("failed to notify ls leader archive", K(tmp_ret), K(arg)); + } else { + LOG_INFO("succeed to notify ls leader archive", K(arg), K(it->first)); + } + } + } + return ret; } diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 8cf6ecdbc..6b9686d13 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -6454,6 +6454,24 @@ int ObBackupCleanArg::assign(const ObBackupCleanArg &arg) return ret; } +OB_SERIALIZE_MEMBER(ObNotifyArchiveArg, tenant_id_); + +bool ObNotifyArchiveArg::is_valid() const +{ + return is_user_tenant(tenant_id_); +} + +int ObNotifyArchiveArg::assign(const ObNotifyArchiveArg &arg) +{ + int ret = OB_SUCCESS; + if(!arg.is_valid()){ + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(arg)); + } else { + tenant_id_ = arg.tenant_id_; + } + return ret; +} OB_SERIALIZE_MEMBER(ObDeletePolicyArg, initiator_tenant_id_, type_, policy_name_, recovery_window_, redundancy_, backup_copies_, clean_tenant_ids_); diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 93ef072c6..4bf889437 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -7790,6 +7790,24 @@ private: DISALLOW_COPY_AND_ASSIGN(ObBackupCleanArg); }; +struct ObNotifyArchiveArg +{ + OB_UNIS_VERSION(1); +public: + ObNotifyArchiveArg() : + tenant_id_(common::OB_INVALID_TENANT_ID) + { + } +public: + bool is_valid() const; + int assign(const ObNotifyArchiveArg &arg); + TO_STRING_KV(K_(tenant_id)); +public: + uint64_t tenant_id_; +private: + DISALLOW_COPY_AND_ASSIGN(ObNotifyArchiveArg); +}; + struct ObLSBackupCleanArg { OB_UNIS_VERSION(1); diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 201a7af2c..5da9efd7c 100755 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -70,6 +70,7 @@ public: RPC_S(PR5 check_backup_task_exist, OB_CHECK_BACKUP_TASK_EXIST, (ObBackupCheckTaskArg), obrpc::Bool); RPC_S(PR5 report_backup_over, OB_BACKUP_LS_DATA_RES, (ObBackupTaskRes)); RPC_S(PR5 report_backup_clean_over, OB_DELETE_BACKUP_LS_TASK_RES, (ObBackupTaskRes)); + RPC_S(PR5 notify_archive, OB_NOTIFY_ARCHIVE, (ObNotifyArchiveArg)); // ls disaster recovery rpc RPC_S(PR5 ls_migrate_replica, OB_LS_MIGRATE_REPLICA, (ObLSMigrateReplicaArg));