fixed #1432: complete the notify_ function
This commit is contained in:
parent
ddabe52cdb
commit
a31e422133
@ -908,6 +908,7 @@ PCODE_DEF(OB_DELETE_POLICY, 0x1426)
|
||||
// 0x1427 for OB_RECOVER_TABLE
|
||||
// 0x1428 for OB_BACKUP_ADVANCE_CHECKPOINT
|
||||
// 0x1429 for OB_RECOVER_RESTORE_TABLE_DDL
|
||||
PCODE_DEF(OB_NOTIFY_ARCHIVE, 0x142A)
|
||||
// backup and restore end 0x14ff
|
||||
|
||||
// logservice
|
||||
|
@ -196,12 +196,7 @@ int ObArchiveService::get_ls_archive_speed(const ObLSID &id,
|
||||
return persist_mgr_.get_ls_archive_speed(id, speed, force_wait, ignore);
|
||||
}
|
||||
|
||||
void ObArchiveService::process_start_archive()
|
||||
{
|
||||
cond_.signal();
|
||||
}
|
||||
|
||||
void ObArchiveService::process_stop_archive()
|
||||
void ObArchiveService::wakeup()
|
||||
{
|
||||
cond_.signal();
|
||||
}
|
||||
|
@ -105,9 +105,7 @@ public:
|
||||
int get_ls_archive_speed(const ObLSID &id, int64_t &speed, bool &force_wait, bool &ignore);
|
||||
|
||||
///////////// RPC process functions /////////////////
|
||||
void process_start_archive();
|
||||
|
||||
void process_stop_archive();
|
||||
void wakeup();
|
||||
|
||||
int iterate_ls(const std::function<int(const ObLSArchiveTask&)> &func);
|
||||
|
||||
|
@ -618,6 +618,18 @@ int ObRpcBackupLSCleanP::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcNotifyArchiveP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(gctx_.ob_service_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid argument", KP(gctx_.ob_service_), K(ret));
|
||||
} else {
|
||||
ret = gctx_.ob_service_->notify_archive(arg_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcBackupLSDataP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -155,6 +155,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_BACKUP_META, ObRpcBackupMetaP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_CREATE_DUPLICATE_LS, ObRpcCreateDuplicateLSP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_DELETE_BACKUP_LS_TASK_RES, ObRpcBackupCleanLSResP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_BACKUP_LS_DATA_RES, ObRpcBackupLSDataResP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_ARCHIVE, ObRpcNotifyArchiveP);
|
||||
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_LS_MIGRATE_REPLICA, ObRpcLSMigrateReplicaP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_LS_ADD_REPLICA, ObRpcLSAddReplicaP);
|
||||
|
@ -63,6 +63,7 @@
|
||||
#include "storage/ls/ob_ls.h"
|
||||
#include "logservice/ob_log_service.h" // ObLogService
|
||||
#include "logservice/palf_handle_guard.h" // PalfHandleGuard
|
||||
#include "logservice/archiveservice/ob_archive_service.h"
|
||||
#include "share/scn.h" // PalfHandleGuard
|
||||
#include "storage/backup/ob_backup_handler.h"
|
||||
#include "storage/backup/ob_ls_backup_clean_mgr.h"
|
||||
@ -92,6 +93,7 @@ using namespace share::schema;
|
||||
using namespace storage;
|
||||
using namespace backup;
|
||||
using namespace palf;
|
||||
using namespace archive;
|
||||
|
||||
namespace observer
|
||||
{
|
||||
@ -871,6 +873,32 @@ int ObService::delete_backup_ls_task(const obrpc::ObLSBackupCleanArg &arg)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::notify_archive(const obrpc::ObNotifyArchiveArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_INFO("recieve notify archive request", K(arg));
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObService not init", K(ret));
|
||||
} else if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(arg));
|
||||
} else {
|
||||
MTL_SWITCH(arg.tenant_id_){
|
||||
archive::ObArchiveService *archive_service = MTL(ObArchiveService*);
|
||||
if (OB_ISNULL(archive_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null MTL scheduler", K(ret), KP(archive_service));
|
||||
} else {
|
||||
archive_service->wakeup();
|
||||
LOG_INFO("succeed to notify archive service", K(arg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObService::check_sys_task_exist(
|
||||
const share::ObTaskId &arg, bool &res)
|
||||
{
|
||||
|
@ -146,6 +146,7 @@ public:
|
||||
int check_sys_task_exist(const share::ObTaskId &arg, bool &res);
|
||||
int check_migrate_task_exist(const share::ObTaskId &arg, bool &res);
|
||||
int delete_backup_ls_task(const obrpc::ObLSBackupCleanArg &arg);
|
||||
int notify_archive(const obrpc::ObNotifyArchiveArg &arg);
|
||||
int report_backup_over(const obrpc::ObBackupTaskRes &res);
|
||||
int report_backup_clean_over(const obrpc::ObBackupTaskRes &res);
|
||||
|
||||
|
@ -88,6 +88,7 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) {
|
||||
RPC_PROCESSOR(ObRpcBackupMetaP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcBackupLSDataResP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcBackupCleanLSResP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcNotifyArchiveP, gctx_);
|
||||
|
||||
RPC_PROCESSOR(ObRpcCheckBackupTaskExistP, gctx_);
|
||||
RPC_PROCESSOR(ObRenewInZoneHbP, gctx_);
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "share/backup/ob_archive_store.h"
|
||||
#include "share/backup/ob_backup_connectivity.h"
|
||||
#include "share/ls/ob_ls_i_life_manager.h"
|
||||
#include "share/ls/ob_ls_operator.h"
|
||||
#include "share/scn.h"
|
||||
#include "share/ob_debug_sync.h"
|
||||
|
||||
@ -547,6 +548,7 @@ int ObArchiveHandler::check_can_do_archive(bool &can) const
|
||||
int ObArchiveHandler::enable_archive(const int64_t dest_no)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool can;
|
||||
bool create;
|
||||
ObTenantArchiveRoundAttr new_round_attr;
|
||||
@ -564,6 +566,11 @@ int ObArchiveHandler::enable_archive(const int64_t dest_no)
|
||||
} else if (OB_FAIL(round_handler_.enable_archive(dest_no, new_round_attr))) {
|
||||
LOG_WARN("failed to enable archive", K(ret), K_(tenant_id), K(dest_no));
|
||||
} else {
|
||||
if (new_round_attr.state_.status_ == ObArchiveRoundState::Status::BEGINNING) {
|
||||
if (OB_TMP_FAIL(notify_(new_round_attr))) {
|
||||
LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr));
|
||||
}
|
||||
}
|
||||
LOG_INFO("enable archive", K(dest_no), K(new_round_attr));
|
||||
}
|
||||
|
||||
@ -580,6 +587,7 @@ int ObArchiveHandler::enable_archive(const int64_t dest_no)
|
||||
int ObArchiveHandler::disable_archive(const int64_t dest_no)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTenantArchiveRoundAttr new_round_attr;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
@ -588,6 +596,9 @@ int ObArchiveHandler::disable_archive(const int64_t dest_no)
|
||||
} else if (OB_FAIL(round_handler_.disable_archive(dest_no, new_round_attr))) {
|
||||
LOG_WARN("failed to disable archive", K(ret), K_(tenant_id), K(dest_no));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(notify_(new_round_attr))) {
|
||||
LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr));
|
||||
}
|
||||
LOG_INFO("disable archive", K(dest_no), K(new_round_attr));
|
||||
}
|
||||
|
||||
@ -604,6 +615,7 @@ int ObArchiveHandler::disable_archive(const int64_t dest_no)
|
||||
int ObArchiveHandler::defer_archive(const int64_t dest_no)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTenantArchiveRoundAttr new_round_attr;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
@ -612,6 +624,9 @@ int ObArchiveHandler::defer_archive(const int64_t dest_no)
|
||||
} else if (OB_FAIL(round_handler_.defer_archive(dest_no, new_round_attr))) {
|
||||
LOG_WARN("failed to defer archive", K(ret), K_(tenant_id), K(dest_no));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(notify_(new_round_attr))) {
|
||||
LOG_WARN("notify failed", K(tmp_ret), K(new_round_attr));
|
||||
}
|
||||
LOG_INFO("defer archive", K(dest_no), K(new_round_attr));
|
||||
}
|
||||
|
||||
@ -755,9 +770,40 @@ int ObArchiveHandler::do_checkpoint_(share::ObTenantArchiveRoundAttr &round_info
|
||||
int ObArchiveHandler::notify_(const ObTenantArchiveRoundAttr &round)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// TODO: notify each log stream.
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
UNUSED(round);
|
||||
// Get all log streams, and try the best to notify each log stream event of archive start.
|
||||
share::ObLSAttrArray ls_array;
|
||||
share::ObLSAttrOperator ls_operator(tenant_id_, sql_proxy_);
|
||||
hash::ObHashSet<ObAddr> notify_addr_set;
|
||||
share::ObLocationService *location_service = GCTX.location_service_;
|
||||
const bool force_renew = true;
|
||||
common::ObAddr leader_addr;
|
||||
obrpc::ObNotifyArchiveArg arg;
|
||||
arg.tenant_id_ = tenant_id_;
|
||||
|
||||
if (OB_FAIL(ls_operator.get_all_ls_by_order(ls_array))) {
|
||||
LOG_WARN("failed to get all ls info", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(notify_addr_set.create(ls_array.count()))) {
|
||||
LOG_WARN("failed to create notify addr set", K(ret));
|
||||
} else {
|
||||
ARRAY_FOREACH_N(ls_array, i, cnt) {
|
||||
const ObLSAttr &ls_attr = ls_array.at(i);
|
||||
if(OB_FAIL(location_service->get_leader(GCONF.cluster_id, tenant_id_, ls_attr.get_ls_id(), force_renew, leader_addr))) {
|
||||
LOG_WARN("failed to get leader addr", K(ret), KP(location_service), "ls_id", ls_attr.get_ls_id());
|
||||
} else if(OB_FAIL(notify_addr_set.set_refactored(leader_addr))) {
|
||||
LOG_WARN("failed to set server_addr in notify_addr_set", K(ret), "ls_id", ls_attr.get_ls_id(), K(leader_addr));
|
||||
}
|
||||
}
|
||||
LOG_INFO("leader_addr_set to be notified archive:", K(notify_addr_set));
|
||||
for (hash::ObHashSet<ObAddr>::const_iterator it = notify_addr_set.begin(); it != notify_addr_set.end(); it++) {
|
||||
if (OB_TMP_FAIL(rpc_proxy_->to(it->first).notify_archive(arg))) {
|
||||
LOG_WARN("failed to notify ls leader archive", K(tmp_ret), K(arg));
|
||||
} else {
|
||||
LOG_INFO("succeed to notify ls leader archive", K(arg), K(it->first));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -6454,6 +6454,24 @@ int ObBackupCleanArg::assign(const ObBackupCleanArg &arg)
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObNotifyArchiveArg, tenant_id_);
|
||||
|
||||
bool ObNotifyArchiveArg::is_valid() const
|
||||
{
|
||||
return is_user_tenant(tenant_id_);
|
||||
}
|
||||
|
||||
int ObNotifyArchiveArg::assign(const ObNotifyArchiveArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if(!arg.is_valid()){
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(arg));
|
||||
} else {
|
||||
tenant_id_ = arg.tenant_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDeletePolicyArg, initiator_tenant_id_, type_, policy_name_, recovery_window_,
|
||||
redundancy_, backup_copies_, clean_tenant_ids_);
|
||||
|
@ -7790,6 +7790,24 @@ private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObBackupCleanArg);
|
||||
};
|
||||
|
||||
struct ObNotifyArchiveArg
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObNotifyArchiveArg() :
|
||||
tenant_id_(common::OB_INVALID_TENANT_ID)
|
||||
{
|
||||
}
|
||||
public:
|
||||
bool is_valid() const;
|
||||
int assign(const ObNotifyArchiveArg &arg);
|
||||
TO_STRING_KV(K_(tenant_id));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObNotifyArchiveArg);
|
||||
};
|
||||
|
||||
struct ObLSBackupCleanArg
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
@ -70,6 +70,7 @@ public:
|
||||
RPC_S(PR5 check_backup_task_exist, OB_CHECK_BACKUP_TASK_EXIST, (ObBackupCheckTaskArg), obrpc::Bool);
|
||||
RPC_S(PR5 report_backup_over, OB_BACKUP_LS_DATA_RES, (ObBackupTaskRes));
|
||||
RPC_S(PR5 report_backup_clean_over, OB_DELETE_BACKUP_LS_TASK_RES, (ObBackupTaskRes));
|
||||
RPC_S(PR5 notify_archive, OB_NOTIFY_ARCHIVE, (ObNotifyArchiveArg));
|
||||
|
||||
// ls disaster recovery rpc
|
||||
RPC_S(PR5 ls_migrate_replica, OB_LS_MIGRATE_REPLICA, (ObLSMigrateReplicaArg));
|
||||
|
Loading…
x
Reference in New Issue
Block a user