diff --git a/src/rootserver/ob_alter_locality_finish_checker.cpp b/src/rootserver/ob_alter_locality_finish_checker.cpp index a4770da971..10915b2003 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.cpp +++ b/src/rootserver/ob_alter_locality_finish_checker.cpp @@ -34,7 +34,6 @@ ObAlterLocalityFinishChecker::ObAlterLocalityFinishChecker(volatile bool &stop) schema_service_(NULL), common_rpc_proxy_(NULL), self_(), - unit_mgr_(NULL), zone_mgr_(NULL), sql_proxy_(NULL), stop_(stop) @@ -49,7 +48,6 @@ int ObAlterLocalityFinishChecker::init( share::schema::ObMultiVersionSchemaService &schema_service, obrpc::ObCommonRpcProxy &common_rpc_proxy, common::ObAddr &addr, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, common::ObMySQLProxy &sql_proxy, share::ObLSTableOperator &lst_operator) @@ -65,7 +63,6 @@ int ObAlterLocalityFinishChecker::init( schema_service_ = &schema_service; common_rpc_proxy_ = &common_rpc_proxy; self_ = addr; - unit_mgr_ = &unit_mgr; zone_mgr_ = &zone_mgr; sql_proxy_ = &sql_proxy; lst_operator_ = &lst_operator; @@ -86,11 +83,10 @@ int ObAlterLocalityFinishChecker::check() ret = OB_NOT_INIT; LOG_WARN("ObAlterLocalityFinishChecker not init", KR(ret)); } else if (OB_ISNULL(schema_service_) - || OB_ISNULL(unit_mgr_) || OB_ISNULL(zone_mgr_) || !self_.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), KP_(schema_service), KP_(unit_mgr), KP_(zone_mgr), K_(self)); + LOG_WARN("invalid argument", KR(ret), KP_(schema_service), KP_(zone_mgr), K_(self)); } else if (OB_FAIL(check_stop())) { LOG_WARN("ObAlterLocalityFinishChecker stopped", KR(ret)); } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) { @@ -139,14 +135,12 @@ int ObAlterLocalityFinishChecker::check() } } else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match( tenant_id, - *unit_mgr_, *zone_mgr_, alter_locality_finish))){ LOG_WARN("fail to check tenant locality match", KR(tmp_ret), K(tenant_id), K(alter_locality_finish)); } else if (is_user_tenant(tenant_id) && OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match( gen_meta_tenant_id(tenant_id), - *unit_mgr_, *zone_mgr_, meta_alter_locality_finish))){ LOG_WARN("fail to check tenant locality match", KR(tmp_ret), "meta_tenant_id", diff --git a/src/rootserver/ob_alter_locality_finish_checker.h b/src/rootserver/ob_alter_locality_finish_checker.h index 4dfa0faaf3..24e77f4532 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.h +++ b/src/rootserver/ob_alter_locality_finish_checker.h @@ -58,7 +58,6 @@ public: share::schema::ObMultiVersionSchemaService &schema_service, obrpc::ObCommonRpcProxy &common_rpc_proxy, common::ObAddr &self, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, common::ObMySQLProxy &sql_proxy, share::ObLSTableOperator &lst_operator); @@ -74,7 +73,6 @@ private: share::schema::ObMultiVersionSchemaService *schema_service_; obrpc::ObCommonRpcProxy *common_rpc_proxy_; //use GCTX.rs_rpc_proxy_ common::ObAddr self_; - ObUnitManager *unit_mgr_; ObZoneManager *zone_mgr_; common::ObMySQLProxy *sql_proxy_; share::ObLSTableOperator *lst_operator_; diff --git a/src/rootserver/ob_disaster_recovery_info.cpp b/src/rootserver/ob_disaster_recovery_info.cpp index 7d0ea670fa..07c956ac9a 100644 --- a/src/rootserver/ob_disaster_recovery_info.cpp +++ b/src/rootserver/ob_disaster_recovery_info.cpp @@ -63,21 +63,21 @@ int DRServerStatInfo::assign( int DRUnitStatInfo::init( const uint64_t unit_id, const bool in_pool, - const share::ObUnitInfo &unit_info, + const share::ObUnit &unit, DRServerStatInfo *server_stat, const int64_t outside_replica_cnt) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_ID == unit_id - || !unit_info.is_valid() + || !unit.is_valid() || nullptr == server_stat)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(unit_id), K(unit_info), KP(server_stat)); + LOG_WARN("invalid argument", KR(ret), K(unit_id), K(unit), KP(server_stat)); } else { unit_id_ = unit_id; in_pool_ = in_pool; - if (OB_FAIL(unit_info_.assign(unit_info))) { - LOG_WARN("fail to assign", KR(ret)); + if (OB_FAIL(unit_.assign(unit))) { + LOG_WARN("fail to assign", KR(ret), K(unit)); } else { server_stat_ = server_stat; outside_replica_cnt_ = outside_replica_cnt; @@ -90,11 +90,11 @@ int DRUnitStatInfo::assign( const DRUnitStatInfo &that) { int ret = OB_SUCCESS; - this->unit_id_ = that.unit_id_; - this->in_pool_ = that.in_pool_; - if (OB_FAIL(this->unit_info_.assign(that.unit_info_))) { - LOG_WARN("fail to assign", KR(ret)); + if (OB_FAIL(this->unit_.assign(that.unit_))) { + LOG_WARN("fail to assign unit", KR(ret), K(that.unit_)); } else { + this->unit_id_ = that.unit_id_; + this->in_pool_ = that.in_pool_; this->server_stat_ = that.server_stat_; this->outside_replica_cnt_ = that.outside_replica_cnt_; } @@ -113,6 +113,11 @@ int DRLSInfo::init() } else if (OB_UNLIKELY(nullptr == schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(unit_operator_.init(*GCTX.sql_proxy_))) { + LOG_WARN("unit operator init failed", KR(ret)); } else if (OB_FAIL(unit_stat_info_map_.init( UNIT_MAP_BUCKET_NUM))) { LOG_WARN("fail to init unit stat info map", KR(ret)); @@ -314,32 +319,29 @@ int DRLSInfo::fill_servers() int DRLSInfo::fill_units() { int ret = OB_SUCCESS; - common::ObArray unit_info_array; - if (OB_UNLIKELY(nullptr == unit_mgr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_)); - } else if (OB_UNLIKELY(OB_INVALID_ID == resource_tenant_id_)) { + common::ObArray unit_array; + if (OB_UNLIKELY(OB_INVALID_ID == resource_tenant_id_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(resource_tenant_id_)); - } else if (OB_FAIL(unit_mgr_->get_all_unit_infos_by_tenant( - resource_tenant_id_, unit_info_array))) { + } else if (OB_FAIL(unit_operator_.get_units_by_tenant( + resource_tenant_id_, unit_array))) { LOG_WARN("fail to get all unit infos by tenant", KR(ret), K(resource_tenant_id_)); } else { - FOREACH_X(u, unit_info_array, OB_SUCC(ret)) { + FOREACH_X(u, unit_array, OB_SUCC(ret)) { UnitStatInfoMap::Item *item = nullptr; ServerStatInfoMap::Item *server_item = nullptr; if (OB_UNLIKELY(nullptr == u)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("u ptr is null", KR(ret)); - } else if (OB_FAIL(unit_stat_info_map_.locate(u->unit_.unit_id_, item))) { - LOG_WARN("fail to locate unit", KR(ret), "unit_id", u->unit_.unit_id_); - } else if (OB_FAIL(server_stat_info_map_.locate(u->unit_.server_, server_item))) { - LOG_WARN("fail to locate server", KR(ret), "server", u->unit_.server_); + } else if (OB_FAIL(unit_stat_info_map_.locate(u->unit_id_, item))) { + LOG_WARN("fail to locate unit", KR(ret), "unit_id", u->unit_id_); + } else if (OB_FAIL(server_stat_info_map_.locate(u->server_, server_item))) { + LOG_WARN("fail to locate server", KR(ret), "server", u->server_); } else if (OB_UNLIKELY(nullptr == item || nullptr == server_item)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("item ptr is null", KR(ret)); } else if (OB_FAIL(item->v_.init( - u->unit_.unit_id_, + u->unit_id_, true, /*in pool*/ *u, &server_item->v_, @@ -445,9 +447,9 @@ int DRLSInfo::build_disaster_ls_info( if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("DRWorker not init", KR(ret)); - } else if (OB_ISNULL(schema_service_) || OB_ISNULL(unit_mgr_)) { + } else if (OB_ISNULL(schema_service_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_), KP(unit_mgr_)); + LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_)); } else if (resource_tenant_id_ != gen_user_tenant_id(ls_info.get_tenant_id())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("tenant id not match", KR(ret), K(resource_tenant_id_), @@ -479,37 +481,36 @@ int DRLSInfo::build_disaster_ls_info( } else { for (int64_t i = 0; OB_SUCC(ret) && i < inner_ls_info_.get_replicas().count(); ++i) { ServerStatInfoMap::Item *server = nullptr; - UnitStatInfoMap::Item *unit = nullptr; + UnitStatInfoMap::Item *unit_in_map = nullptr; UnitStatInfoMap::Item *unit_in_group = nullptr; - share::ObUnitInfo unit_info; + share::ObUnit unit; share::ObLSReplica &ls_replica = inner_ls_info_.get_replicas().at(i); if (!ls_replica.get_in_member_list() && !ls_replica.get_in_learner_list()) { LOG_INFO("replica is neither in member list nor in learner list", K(ls_replica)); } else if (OB_FAIL(server_stat_info_map_.locate(ls_replica.get_server(), server))) { LOG_WARN("fail to locate server", KR(ret), "server", ls_replica.get_server()); - } else if (OB_FAIL(unit_stat_info_map_.locate(ls_replica.get_unit_id(), unit))) { + } else if (OB_FAIL(unit_stat_info_map_.locate(ls_replica.get_unit_id(), unit_in_map))) { LOG_WARN("fail to locate unit", KR(ret), "unit_id", ls_replica.get_unit_id()); } else { if (0 == ls_status_info.unit_group_id_) { - unit_in_group = unit; - } else if (OB_FAIL(unit_mgr_->get_unit_in_group( - ls_replica.get_tenant_id(), + unit_in_group = unit_in_map; + } else if (OB_FAIL(unit_operator_.get_unit_in_group( ls_status_info.unit_group_id_, ls_replica.get_zone(), - unit_info))) { + unit))) { LOG_WARN("fail to get unit in group", KR(ret), K(ls_replica)); - } else if (OB_FAIL(unit_stat_info_map_.locate(unit_info.unit_.unit_id_, unit_in_group))) { - LOG_WARN("fail to locate unit", KR(ret), "unit_id", unit_info.unit_.unit_id_); + } else if (OB_FAIL(unit_stat_info_map_.locate(unit.unit_id_, unit_in_group))) { + LOG_WARN("fail to locate unit", KR(ret), "unit_id", unit.unit_id_); } if (OB_SUCC(ret)) { - if (OB_ISNULL(server) || OB_ISNULL(unit) || OB_ISNULL(unit_in_group)) { + if (OB_ISNULL(server) || OB_ISNULL(unit_in_map) || OB_ISNULL(unit_in_group)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unit or server ptr is null", KR(ret), KP(server), KP(unit), K(ls_replica)); + LOG_WARN("unit or server ptr is null", KR(ret), KP(server), KP(unit_in_map), K(ls_replica)); } else if (OB_FAIL(append_replica_server_unit_stat( - &server->v_, &unit->v_, &unit_in_group->v_))) { + &server->v_, &unit_in_map->v_, &unit_in_group->v_))) { LOG_WARN("fail to append replica server/unit stat", KR(ret), - "server_stat_info", server->v_, "unit_stat_info", unit->v_); + "server_stat_info", server->v_, "unit_stat_info", unit_in_map->v_); } } } diff --git a/src/rootserver/ob_disaster_recovery_info.h b/src/rootserver/ob_disaster_recovery_info.h index 83c4370e3a..53218bb903 100644 --- a/src/rootserver/ob_disaster_recovery_info.h +++ b/src/rootserver/ob_disaster_recovery_info.h @@ -91,7 +91,7 @@ struct DRUnitStatInfo public: DRUnitStatInfo() : unit_id_(common::OB_INVALID_ID), in_pool_(false), - unit_info_(), + unit_(), server_stat_(nullptr), outside_replica_cnt_(0) {} public: @@ -101,7 +101,7 @@ public: TO_STRING_KV(K_(unit_id), K_(in_pool), - K_(unit_info), + K_(unit), KPC_(server_stat)); int assign( @@ -110,20 +110,20 @@ public: int init( const uint64_t unit_id, const bool in_pool, - const share::ObUnitInfo &unit_info, + const share::ObUnit &unit, DRServerStatInfo *server_stat, const int64_t outside_replica_cnt); public: uint64_t get_unit_id() const { return unit_id_; } bool is_in_pool() const { return in_pool_; } - const share::ObUnitInfo &get_unit_info() const { return unit_info_; } + const share::ObUnit &get_unit() const { return unit_; } const DRServerStatInfo *get_server_stat() const { return server_stat_; } int64_t get_outside_replica_cnt() const { return outside_replica_cnt_; } void inc_outside_replica_cnt() { ++outside_replica_cnt_; } private: uint64_t unit_id_; bool in_pool_; - share::ObUnitInfo unit_info_; + share::ObUnit unit_; DRServerStatInfo *server_stat_; int64_t outside_replica_cnt_; }; @@ -139,12 +139,10 @@ class DRLSInfo { public: DRLSInfo(const uint64_t resource_tenant_id, - ObUnitManager *unit_mgr, ObZoneManager *zone_mgr, share::schema::ObMultiVersionSchemaService *schema_service) : resource_tenant_id_(resource_tenant_id), sys_schema_guard_(), - unit_mgr_(unit_mgr), zone_mgr_(zone_mgr), schema_service_(schema_service), unit_stat_info_map_("DRUnitStatMap"), @@ -163,7 +161,7 @@ public: inited_(false) {} virtual ~DRLSInfo() {} public: - // use user_tenant_id to init unit_info and locality + // use user_tenant_id to init unit and locality int init(); int build_disaster_ls_info( const share::ObLSInfo &ls_info, @@ -241,7 +239,7 @@ private: private: uint64_t resource_tenant_id_; share::schema::ObSchemaGetterGuard sys_schema_guard_; - ObUnitManager *unit_mgr_; + share::ObUnitTableOperator unit_operator_; ObZoneManager *zone_mgr_; share::schema::ObMultiVersionSchemaService *schema_service_; UnitStatInfoMap unit_stat_info_map_; diff --git a/src/rootserver/ob_disaster_recovery_worker.cpp b/src/rootserver/ob_disaster_recovery_worker.cpp index 611feecf0b..db759754f7 100755 --- a/src/rootserver/ob_disaster_recovery_worker.cpp +++ b/src/rootserver/ob_disaster_recovery_worker.cpp @@ -221,12 +221,10 @@ int ObDRWorker::LocalityAlignment::locate_zone_locality( return ret; } -ObDRWorker::LocalityAlignment::LocalityAlignment(ObUnitManager *unit_mgr, - ObZoneManager *zone_mgr, +ObDRWorker::LocalityAlignment::LocalityAlignment(ObZoneManager *zone_mgr, DRLSInfo &dr_ls_info) : task_idx_(0), add_replica_task_(), - unit_mgr_(unit_mgr), zone_mgr_(zone_mgr), dr_ls_info_(dr_ls_info), task_array_(), @@ -274,10 +272,7 @@ int ObDRWorker::LocalityAlignment::build_locality_stat_map() int ret = OB_SUCCESS; uint64_t tenant_id = OB_INVALID_ID; share::ObLSID ls_id; - if (OB_ISNULL(unit_mgr_)) { - ret = OB_NOT_INIT; - LOG_WARN("LocalityAlignment not init", KR(ret), KP(unit_mgr_)); - } else if (OB_FAIL(dr_ls_info_.get_ls_id(tenant_id, ls_id))) { + if (OB_FAIL(dr_ls_info_.get_ls_id(tenant_id, ls_id))) { LOG_WARN("fail to get ls id", KR(ret)); } else { LOG_INFO("build ls locality stat map", K(tenant_id), K(ls_id)); @@ -464,8 +459,8 @@ int ObDRWorker::LocalityAlignment::try_remove_match( LOG_WARN("zone replica desc ptr is null", KR(ret), K(zone)); } else { bool has_correct_dest_replica = false; - if (replica->get_server() != unit_stat_info->get_unit_info().unit_.server_ - || !unit_stat_info->get_unit_info().unit_.is_active_status()) { + if (replica->get_server() != unit_stat_info->get_unit().server_ + || !unit_stat_info->get_unit().is_active_status()) { // this replica is migrating or unit is deleting, check whether has a correct dest replica LOG_TRACE("try to check whether has dest replica", KPC(replica), KPC(unit_stat_info)); const int64_t map_count = replica_stat_map_.count(); @@ -482,9 +477,9 @@ int ObDRWorker::LocalityAlignment::try_remove_match( } else if (replica->is_in_service() && replica_stat_desc_to_compare.replica_->get_replica_type() == replica->get_replica_type() && replica_stat_desc_to_compare.replica_->get_server() != replica->get_server() - && (replica_stat_desc_to_compare.replica_->get_server() == unit_stat_info->get_unit_info().unit_.server_ - || replica_stat_desc_to_compare.replica_->get_server() == replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.server_) - && replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.is_active_status()) { + && (replica_stat_desc_to_compare.replica_->get_server() == unit_stat_info->get_unit().server_ + || replica_stat_desc_to_compare.replica_->get_server() == replica_stat_desc_to_compare.unit_stat_info_->get_unit().server_) + && replica_stat_desc_to_compare.unit_stat_info_->get_unit().is_active_status()) { // A replica is a correct dest replica if these conditions above all satisfied // (1) replica is in member_list(learner_lsit) // (2) replica type is expected @@ -499,9 +494,9 @@ int ObDRWorker::LocalityAlignment::try_remove_match( "replica_type", replica->get_replica_type(), "server_to_compare", replica_stat_desc_to_compare.replica_->get_server(), "server", replica->get_server(), - "server_with_unit", unit_stat_info->get_unit_info().unit_.server_, - "server_with_unit_to_compare", replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.server_, - "unit_status_is_active", replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.is_active_status()); + "server_with_unit", unit_stat_info->get_unit().server_, + "server_with_unit_to_compare", replica_stat_desc_to_compare.unit_stat_info_->get_unit().server_, + "unit_status_is_active", replica_stat_desc_to_compare.unit_stat_info_->get_unit().is_active_status()); } } } @@ -514,8 +509,8 @@ int ObDRWorker::LocalityAlignment::try_remove_match( && replica->get_memstore_percent() == replica_desc.memstore_percent_ && replica_desc.replica_num_ > 0 && (!has_correct_dest_replica - || (unit_stat_info->get_unit_info().unit_.is_active_status() - && server_stat_info->get_server() == unit_stat_info->get_unit_info().unit_.server_))) { + || (unit_stat_info->get_unit().is_active_status() + && server_stat_info->get_server() == unit_stat_info->get_unit().server_))) { found = true; if (OB_FAIL(replica_stat_map_.remove(index))) { LOG_WARN("fail to remove from stat map", KR(ret)); @@ -741,14 +736,14 @@ int ObDRWorker::LocalityAlignment::try_generate_type_transform_task_for_readonly ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(replica_stat_desc)); } else { - const share::ObUnitInfo &unit_info = replica_stat_desc.unit_stat_info_->get_unit_info(); + const share::ObUnit &unit = replica_stat_desc.unit_stat_info_->get_unit(); bool server_is_active = false; - if (!unit_info.unit_.is_active_status()) { - FLOG_INFO("unit status is not normal, can not generate type transform task", K(unit_info)); - } else if (OB_FAIL(SVR_TRACER.check_server_active(unit_info.unit_.server_, server_is_active))) { - LOG_WARN("fail to check server is active", KR(ret), K(unit_info)); + if (!unit.is_active_status()) { + FLOG_INFO("unit status is not normal, can not generate type transform task", K(unit)); + } else if (OB_FAIL(SVR_TRACER.check_server_active(unit.server_, server_is_active))) { + LOG_WARN("fail to check server is active", KR(ret), K(unit)); } else if (!server_is_active) { - FLOG_INFO("server status is not active, can not generate type transform task", K(unit_info)); + FLOG_INFO("server status is not active, can not generate type transform task", K(unit)); } else if (OB_FAIL(generate_type_transform_task( replica_stat_desc, replica_desc.replica_type_, @@ -1053,8 +1048,8 @@ int ObDRWorker::LocalityAlignment::generate_type_transform_task( } else { task->zone_ = replica->get_zone(); task->dst_server_ = replica->get_server(); - task->unit_id_ = unit_stat_info->get_unit_info().unit_.unit_id_; - task->unit_group_id_ = unit_stat_info->get_unit_info().unit_.unit_group_id_; + task->unit_id_ = unit_stat_info->get_unit().unit_id_; + task->unit_group_id_ = unit_stat_info->get_unit().unit_group_id_; task->src_replica_type_ = replica->get_replica_type(); task->src_memstore_percent_ = replica->get_memstore_percent(); task->src_member_time_us_ = replica->get_member_time_us(); @@ -1139,10 +1134,9 @@ int ObDRWorker::LocalityAlignment::build() { int ret = OB_SUCCESS; uint64_t tenant_id = OB_INVALID_ID; - if (OB_UNLIKELY(nullptr == unit_mgr_ - || nullptr == zone_mgr_)) { + if (OB_UNLIKELY(nullptr == zone_mgr_)) { ret = OB_NOT_INIT; - LOG_WARN("LocalityAlignment not init", KR(ret), KP(unit_mgr_), KP(zone_mgr_)); + LOG_WARN("LocalityAlignment not init", KR(ret), KP(zone_mgr_)); } else if (OB_FAIL(locality_map_.create(LOCALITY_MAP_BUCKET_NUM, "LocAlign"))) { LOG_WARN("fail to create locality map", KR(ret)); } else if (OB_FAIL(generate_paxos_replica_number())) { @@ -1156,7 +1150,7 @@ int ObDRWorker::LocalityAlignment::build() } else if (OB_FAIL(dr_ls_info_.get_tenant_id(tenant_id))) { LOG_WARN("fail to get tenant id", KR(ret), K(tenant_id)); } else if (OB_FAIL(unit_provider_.init(gen_user_tenant_id(tenant_id), - dr_ls_info_, unit_mgr_))) { + dr_ls_info_))) { LOG_WARN("fail to init unit provider", KR(ret), K(tenant_id), K_(dr_ls_info)); } return ret; @@ -1222,14 +1216,14 @@ int ObDRWorker::LocalityAlignment::try_review_add_replica_task( LOG_WARN("ls status info ptr is null", KR(ret), KP(ls_status_info)); } else { found = false; - share::ObUnitInfo unit_info; + share::ObUnit unit; const common::ObZone &zone = my_task->zone_; - int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit_info); + int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit); if (OB_ITER_END == tmp_ret) { // bypass } else if (OB_SUCCESS == tmp_ret) { - my_task->dst_server_ = unit_info.unit_.server_; - my_task->unit_id_ = unit_info.unit_.unit_id_; + my_task->dst_server_ = unit.server_; + my_task->unit_id_ = unit.unit_id_; my_task->unit_group_id_ = ls_status_info->unit_group_id_; if (ObReplicaTypeCheck::is_paxos_replica_V2(my_task->replica_type_)) { int64_t new_paxos_replica_number = 0; @@ -1457,15 +1451,15 @@ int ObDRWorker::LocalityAlignment::try_get_readonly_all_server_locality_alignmen ret = OB_ERR_UNEXPECTED; LOG_WARN("ls status info ptr is null", KR(ret), KP(ls_status_info)); } else { - share::ObUnitInfo unit_info; - int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit_info); + share::ObUnit unit; + int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit); if (OB_ITER_END == tmp_ret) { // bypass } else if (OB_SUCCESS == tmp_ret) { add_replica_task_.zone_ = zone; - add_replica_task_.dst_server_ = unit_info.unit_.server_; + add_replica_task_.dst_server_ = unit.server_; add_replica_task_.member_time_us_ = ObTimeUtility::current_time(); - add_replica_task_.unit_id_ = unit_info.unit_.unit_id_; + add_replica_task_.unit_id_ = unit.unit_id_; add_replica_task_.unit_group_id_ = ls_status_info->unit_group_id_; add_replica_task_.replica_type_ = REPLICA_TYPE_READONLY; add_replica_task_.memstore_percent_ = replica_desc_array->readonly_memstore_percent_; @@ -1512,21 +1506,24 @@ int ObDRWorker::LocalityAlignment::get_next_locality_alignment_task( int ObDRWorker::UnitProvider::init( const uint64_t tenant_id, - DRLSInfo &dr_ls_info, - ObUnitManager *unit_mgr) + DRLSInfo &dr_ls_info) { int ret = OB_SUCCESS; int64_t replica_cnt = 0; if (OB_UNLIKELY(inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", KR(ret)); - } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || nullptr == unit_mgr)) { + } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), - K(tenant_id), - KP(unit_mgr)); + K(tenant_id)); } else if (OB_FAIL(dr_ls_info.get_replica_cnt(replica_cnt))) { LOG_WARN("failed to get replica count", KR(ret)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(unit_operator_.init(*GCTX.sql_proxy_))) { + LOG_WARN("init unit table operator failed", KR(ret)); } else { const int64_t hash_count = max(replica_cnt, 1); if (OB_FAIL(unit_set_.create(hash::cal_next_prime(hash_count)))) { @@ -1535,7 +1532,6 @@ int ObDRWorker::UnitProvider::init( LOG_WARN("failed to init unit set", KR(ret), K(dr_ls_info)); } else { tenant_id_ = tenant_id; - unit_mgr_ = unit_mgr; inited_ = true; } } @@ -1578,7 +1574,7 @@ int ObDRWorker::UnitProvider::init_unit_set( } else if ((ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type()) && ls_replica->get_in_member_list()) || (!ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type()))) { - if (OB_FAIL(unit_set_.set_refactored(unit_stat_info->get_unit_info().unit_.unit_id_))) { + if (OB_FAIL(unit_set_.set_refactored(unit_stat_info->get_unit().unit_id_))) { LOG_WARN("fail to set refactored", KR(ret)); } } @@ -1589,40 +1585,40 @@ int ObDRWorker::UnitProvider::init_unit_set( int ObDRWorker::UnitProvider::inner_get_valid_unit_( const common::ObZone &zone, - const common::ObArray &unit_array, - share::ObUnitInfo &output_unit_info, + const common::ObArray &unit_array, + share::ObUnit &output_unit, const bool &force_get, bool &found) { int ret = OB_SUCCESS; - output_unit_info.reset(); + output_unit.reset(); found = false; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (OB_ISNULL(unit_mgr_) || OB_UNLIKELY(0 >= unit_array.count())) { + } else if (OB_UNLIKELY(0 >= unit_array.count())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_), "unit_count", unit_array.count()); + LOG_WARN("unit mgr ptr is null", KR(ret), "unit_count", unit_array.count()); } else { bool server_is_active = false; for (int64_t i = 0; OB_SUCC(ret) && i < unit_array.count(); ++i) { server_is_active = false; - const share::ObUnitInfo &unit_info = unit_array.at(i); - const uint64_t unit_id = unit_info.unit_.unit_id_; + const share::ObUnit &unit = unit_array.at(i); + const uint64_t unit_id = unit.unit_id_; int hash_ret = OB_SUCCESS; bool server_and_unit_status_is_valid = true; - if (unit_info.unit_.zone_ != zone) { + if (unit.zone_ != zone) { // bypass, because we do not support operation between different zones } else { if (!force_get) { - if (OB_FAIL(SVR_TRACER.check_server_active(unit_info.unit_.server_, server_is_active))) { - LOG_WARN("fail to check server active", KR(ret), "server", unit_info.unit_.server_); + if (OB_FAIL(SVR_TRACER.check_server_active(unit.server_, server_is_active))) { + LOG_WARN("fail to check server active", KR(ret), "server", unit.server_); } else if (!server_is_active) { server_and_unit_status_is_valid = false; - FLOG_INFO("server is not active", "server", unit_info.unit_.server_, K(server_is_active)); - } else if (!unit_info.unit_.is_active_status()) { + FLOG_INFO("server is not active", "server", unit.server_, K(server_is_active)); + } else if (!unit.is_active_status()) { server_and_unit_status_is_valid = false; - FLOG_INFO("unit status is not normal", K(unit_info)); + FLOG_INFO("unit status is not normal", K(unit)); } else { server_and_unit_status_is_valid = true; } @@ -1633,8 +1629,8 @@ int ObDRWorker::UnitProvider::inner_get_valid_unit_( FLOG_INFO("unit existed", K(unit_id)); } else if (OB_FAIL(hash_ret)) { LOG_WARN("set refactored failed", KR(ret), KR(hash_ret)); - } else if (OB_FAIL(output_unit_info.assign(unit_info))) { - LOG_WARN("fail to assign unit info", KR(ret), K(unit_info)); + } else if (OB_FAIL(output_unit.assign(unit))) { + LOG_WARN("fail to assign unit info", KR(ret), K(unit)); } else { found = true; break; @@ -1648,31 +1644,31 @@ int ObDRWorker::UnitProvider::inner_get_valid_unit_( int ObDRWorker::UnitProvider::allocate_unit( const common::ObZone &zone, const uint64_t unit_group_id, - share::ObUnitInfo &unit_info) + share::ObUnit &unit) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); } else { - common::ObArray unit_array; + common::ObArray unit_array; bool found = false; bool force_get = true; // if unit_group_id is given, just allocate unit belongs to this unit group // 1. if unit_group_id is valid, try get valid unit in this unit group if (unit_group_id > 0) { force_get = true; - if (OB_FAIL(unit_mgr_->get_unit_group(tenant_id_, unit_group_id, unit_array))) { - LOG_WARN("fail to get unit group", KR(ret), K(tenant_id_), K(unit_group_id)); - } else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit_info, force_get, found))) { + if (OB_FAIL(unit_operator_.get_units_by_unit_group_id(unit_group_id, unit_array))) { + LOG_WARN("fail to get unit group", KR(ret), K(unit_group_id)); + } else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit, force_get, found))) { LOG_WARN("fail to get valid unit from certain unit group", KR(ret), K(zone), K(unit_array), K(force_get)); } } else { // 2. if unit_group_id = 0, try get from all units unit_array.reset(); force_get = false; - if (OB_FAIL(unit_mgr_->get_all_unit_infos_by_tenant(tenant_id_, unit_array))) { + if (OB_FAIL(unit_operator_.get_units_by_tenant(tenant_id_, unit_array))) { LOG_WARN("fail to get ll unit infos by tenant", KR(ret), K(tenant_id_)); - } else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit_info, force_get, found))) { + } else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit, force_get, found))) { LOG_WARN("fail to get valid unit from all units in tenant", KR(ret), K(zone), K(unit_array), K(force_get)); } } @@ -1709,7 +1705,6 @@ ObDRWorker::ObDRWorker(volatile bool &stop) dr_task_mgr_is_loaded_(false), self_addr_(), config_(nullptr), - unit_mgr_(nullptr), zone_mgr_(nullptr), disaster_recovery_task_mgr_(nullptr), lst_operator_(nullptr), @@ -1729,7 +1724,6 @@ ObDRWorker::~ObDRWorker() int ObDRWorker::init( common::ObAddr &self_addr, common::ObServerConfig &config, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, ObDRTaskMgr &task_mgr, share::ObLSTableOperator &lst_operator, @@ -1747,7 +1741,6 @@ int ObDRWorker::init( } else { self_addr_ = self_addr; config_ = &config; - unit_mgr_ = &unit_mgr; zone_mgr_ = &zone_mgr; disaster_recovery_task_mgr_ = &task_mgr; lst_operator_ = &lst_operator; @@ -1841,7 +1834,6 @@ void ObDRWorker::statistic_total_dr_task(const int64_t task_cnt) int ObDRWorker::check_tenant_locality_match( const uint64_t tenant_id, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, bool &locality_is_matched) { @@ -1868,7 +1860,6 @@ int ObDRWorker::check_tenant_locality_match( share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i); bool filter_readonly_replicas_with_flag = true; DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id), - &unit_mgr, &zone_mgr, GCTX.schema_service_); if (ls_status_info.ls_is_creating()) { @@ -1889,7 +1880,7 @@ int ObDRWorker::check_tenant_locality_match( LOG_WARN("fail to generate dr log stream info", KR(ret), K(ls_info), K(ls_status_info), K(filter_readonly_replicas_with_flag)); } else if (OB_FAIL(check_ls_locality_match_( - dr_ls_info, unit_mgr, zone_mgr, locality_is_matched))) { + dr_ls_info, zone_mgr, locality_is_matched))) { LOG_WARN("fail to try log stream disaster recovery", KR(ret)); } } @@ -1900,7 +1891,6 @@ int ObDRWorker::check_tenant_locality_match( int ObDRWorker::check_ls_locality_match_( DRLSInfo &dr_ls_info, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, bool &locality_is_matched) { @@ -1908,8 +1898,7 @@ int ObDRWorker::check_ls_locality_match_( int tmp_ret = OB_SUCCESS; locality_is_matched = false; LOG_INFO("start to check ls locality match", K(dr_ls_info)); - LocalityAlignment locality_alignment(&unit_mgr, - &zone_mgr, + LocalityAlignment locality_alignment(&zone_mgr, dr_ls_info); if (!dr_ls_info.has_leader()) { LOG_WARN("has no leader, maybe not report yet", @@ -2031,12 +2020,10 @@ int ObDRWorker::try_tenant_disaster_recovery( share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i); // this structure is used to generate migrtion/locality alignment/shrink unit tasks DRLSInfo dr_ls_info_without_flag(gen_user_tenant_id(tenant_id), - unit_mgr_, zone_mgr_, schema_service_); // this structure is used to generate permanent offline tasks DRLSInfo dr_ls_info_with_flag(gen_user_tenant_id(tenant_id), - unit_mgr_, zone_mgr_, schema_service_); int64_t ls_acc_dr_task = 0; @@ -2660,7 +2647,7 @@ int ObDRWorker::check_need_generate_replicate_to_unit( } else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status() && unit_stat_info->is_in_pool() && !server_stat_info->is_alive() - && unit_stat_info->get_unit_info().unit_.server_ != server_stat_info->get_server() + && unit_stat_info->get_unit().server_ != server_stat_info->get_server() && unit_stat_info->get_server_stat()->is_alive() && !unit_stat_info->get_server_stat()->is_block()) { need_generate = true; @@ -2700,8 +2687,8 @@ int ObDRWorker::construct_extra_infos_to_build_migrate_task( data_size))) { LOG_WARN("fail to choose disaster recovery data source", KR(ret)); } else if (OB_FAIL(dst_replica.assign( - unit_stat_info.get_unit_info().unit_.unit_id_, - unit_in_group_stat_info.get_unit_info().unit_.unit_group_id_, + unit_stat_info.get_unit().unit_id_, + unit_in_group_stat_info.get_unit().unit_group_id_, ls_replica.get_zone(), dst_member))) { LOG_WARN("fail to assign dst replica", KR(ret)); @@ -2803,7 +2790,7 @@ int ObDRWorker::try_replicate_to_unit( need_generate))) { LOG_WARN("fail to check need generate replicate to unit task", KR(ret)); } else if (need_generate) { - ObReplicaMember dst_member(unit_stat_info->get_unit_info().unit_.server_, + ObReplicaMember dst_member(unit_stat_info->get_unit().server_, ObTimeUtility::current_time(), ls_replica->get_replica_type(), ls_replica->get_memstore_percent()); @@ -2865,11 +2852,11 @@ int ObDRWorker::generate_migrate_ls_task( if (OB_FAIL(display_info.init( tenant_id, ls_id, ObDRTaskType::LS_MIGRATE_REPLICA, ObDRTaskPriority::HIGH_PRI, - unit_stat_info.get_unit_info().unit_.server_, + unit_stat_info.get_unit().server_, ls_replica.get_replica_type(), old_paxos_replica_number, ls_replica.get_server(), ls_replica.get_replica_type(), old_paxos_replica_number, - unit_stat_info.get_unit_info().unit_.server_, + unit_stat_info.get_unit().server_, task_comment))) { LOG_WARN("fail to init a ObLSReplicaTaskDisplayInfo", KR(ret)); } else if (OB_FAIL(add_display_info(display_info))) { @@ -3436,7 +3423,7 @@ int ObDRWorker::try_locality_alignment( int ret = OB_SUCCESS; DEBUG_SYNC(BEFORE_TRY_LOCALITY_ALIGNMENT); LOG_INFO("try locality alignment", K(dr_ls_info), K(only_for_display)); - LocalityAlignment locality_alignment(unit_mgr_, zone_mgr_, dr_ls_info); + LocalityAlignment locality_alignment(zone_mgr_, dr_ls_info); const LATask *task = nullptr; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; @@ -3526,8 +3513,7 @@ int ObDRWorker::try_shrink_resource_pools( } else { ObDRWorker::UnitProvider unit_provider; const uint64_t tenant_id = ls_status_info->tenant_id_; - if (OB_FAIL(unit_provider.init(gen_user_tenant_id(tenant_id), dr_ls_info, - unit_mgr_))) { + if (OB_FAIL(unit_provider.init(gen_user_tenant_id(tenant_id), dr_ls_info))) { LOG_WARN("fail to init unit provider", KR(ret), K(tenant_id), K(dr_ls_info)); } for (int64_t index = 0; OB_SUCC(ret) && index < replica_cnt; ++index) { @@ -3558,7 +3544,7 @@ int ObDRWorker::try_shrink_resource_pools( KP(unit_stat_info), KP(unit_in_group_stat_info)); } else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status() - && share::ObUnit::UNIT_STATUS_DELETING == unit_stat_info->get_unit_info().unit_.status_) { + && share::ObUnit::UNIT_STATUS_DELETING == unit_stat_info->get_unit().status_) { // replica is still in member_list, but unit is in DELETING status // If this is a duplicate log stream // 1.1 for R-replica: execute remove_learner task directly @@ -3612,7 +3598,7 @@ int ObDRWorker::try_shrink_resource_pools( } else { // generate task for normal log stream replica if (0 == ls_status_info->ls_group_id_ - || ls_status_info->unit_group_id_ != unit_stat_info->get_unit_info().unit_.unit_group_id_) { + || ls_status_info->unit_group_id_ != unit_stat_info->get_unit().unit_group_id_) { //If the Unit Group is in the DELETING status, we need to migrate out the LS that do not belong to that Unit Group. //LS belonging to this Unit Group will be automatically processed by Balance module // 2.1 try generate and execute migrate replica for normal log stream @@ -3767,7 +3753,7 @@ int ObDRWorker::try_migrate_replica_for_deleting_unit_( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(ls_replica), K(ls_status_info)); } else { - share::ObUnitInfo dest_unit; + share::ObUnit dest_unit; if (OB_FAIL(unit_provider.allocate_unit( ls_replica.get_zone(), ls_status_info.unit_group_id_, @@ -3782,7 +3768,7 @@ int ObDRWorker::try_migrate_replica_for_deleting_unit_( } } else { ObReplicaMember dst_member( - dest_unit.unit_.server_, + dest_unit.server_, ObTimeUtility::current_time(), ls_replica.get_replica_type(), ls_replica.get_memstore_percent()); @@ -3977,8 +3963,8 @@ int ObDRWorker::find_valid_readonly_replica_( if (OB_FAIL(target_replica.assign(*replica))) { LOG_WARN("fail to assign replica", KR(ret), KPC(replica)); } else { - unit_id = unit_stat_info->get_unit_info().unit_.unit_id_; - unit_group_id = unit_stat_info->get_unit_info().unit_.unit_group_id_; + unit_id = unit_stat_info->get_unit().unit_id_; + unit_group_id = unit_stat_info->get_unit().unit_group_id_; find_a_valid_readonly_replica = true; LOG_INFO("find a valid readonly replica to do type transform", K(dr_ls_info), K(exclude_replica), K(target_zone), K(target_replica)); @@ -4134,7 +4120,7 @@ int ObDRWorker::check_need_generate_cancel_unit_migration_task( KP(unit_stat_info), KP(unit_in_group_stat_info)); } else if (unit_stat_info->is_in_pool() - && unit_stat_info->get_unit_info().unit_.migrate_from_server_.is_valid() + && unit_stat_info->get_unit().migrate_from_server_.is_valid() && unit_stat_info->get_server_stat()->is_block() && ls_replica->get_server() == unit_stat_info->get_server_stat()->get_server()) { if (ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type()) @@ -4415,14 +4401,14 @@ int ObDRWorker::check_need_generate_migrate_to_unit_task( KP(unit_in_group_stat_info)); } else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status() && unit_in_group_stat_info->is_in_pool() - && server_stat_info->get_server() != unit_in_group_stat_info->get_unit_info().unit_.server_ + && server_stat_info->get_server() != unit_in_group_stat_info->get_unit().server_ && unit_in_group_stat_info->get_server_stat()->is_alive() && !unit_in_group_stat_info->get_server_stat()->is_block()) { need_generate = true; is_unit_in_group_related = true; } else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status() && unit_stat_info->is_in_pool() - && server_stat_info->get_server() != unit_stat_info->get_unit_info().unit_.server_ + && server_stat_info->get_server() != unit_stat_info->get_unit().server_ && unit_stat_info->get_server_stat()->is_alive() && !unit_stat_info->get_server_stat()->is_block()) { need_generate = true; @@ -4464,9 +4450,9 @@ int ObDRWorker::construct_extra_infos_for_generate_migrate_to_unit_task( LOG_WARN("fail to choose disaster recovery data source", KR(ret)); } else if (OB_FAIL(dst_replica.assign( is_unit_in_group_related - ? unit_in_group_stat_info.get_unit_info().unit_.unit_id_ - : unit_stat_info.get_unit_info().unit_.unit_id_, - unit_in_group_stat_info.get_unit_info().unit_.unit_group_id_, + ? unit_in_group_stat_info.get_unit().unit_id_ + : unit_stat_info.get_unit().unit_id_, + unit_in_group_stat_info.get_unit().unit_group_id_, ls_replica.get_zone(), dst_member))) { LOG_WARN("fail to assign dst replica", KR(ret)); @@ -4602,8 +4588,8 @@ int ObDRWorker::try_migrate_to_unit( ls_replica->get_replica_type(), ls_replica->get_memstore_percent()); ObReplicaMember dst_member(is_unit_in_group_related - ? unit_in_group_stat_info->get_unit_info().unit_.server_ - : unit_stat_info->get_unit_info().unit_.server_, + ? unit_in_group_stat_info->get_unit().server_ + : unit_stat_info->get_unit().server_, ObTimeUtility::current_time(), ls_replica->get_replica_type(), ls_replica->get_memstore_percent()); @@ -4641,16 +4627,16 @@ int ObDRWorker::try_migrate_to_unit( ObDRTaskType::LS_MIGRATE_REPLICA, ObDRTaskPriority::LOW_PRI, is_unit_in_group_related - ? unit_in_group_stat_info->get_unit_info().unit_.server_ - : unit_stat_info->get_unit_info().unit_.server_, + ? unit_in_group_stat_info->get_unit().server_ + : unit_stat_info->get_unit().server_, ls_replica->get_replica_type(), old_paxos_replica_number, ls_replica->get_server(), ls_replica->get_replica_type(), old_paxos_replica_number, is_unit_in_group_related - ? unit_in_group_stat_info->get_unit_info().unit_.server_ - : unit_stat_info->get_unit_info().unit_.server_, + ? unit_in_group_stat_info->get_unit().server_ + : unit_stat_info->get_unit().server_, comment_to_set))) { LOG_WARN("fail to init a ObLSReplicaTaskDisplayInfo", KR(ret)); } else if (OB_FAIL(add_display_info(display_info))) { diff --git a/src/rootserver/ob_disaster_recovery_worker.h b/src/rootserver/ob_disaster_recovery_worker.h index 4ead779e59..4ae5fd2c68 100755 --- a/src/rootserver/ob_disaster_recovery_worker.h +++ b/src/rootserver/ob_disaster_recovery_worker.h @@ -108,7 +108,6 @@ public: int init( common::ObAddr &self_addr, common::ObServerConfig &cfg, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, ObDRTaskMgr &task_mgr, share::ObLSTableOperator &lst_operator, @@ -122,7 +121,6 @@ public: int64_t &acc_dr_task); static int check_tenant_locality_match( const uint64_t tenant_id, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, bool &locality_is_matched); @@ -516,30 +514,28 @@ private: UnitProvider() : inited_(false), tenant_id_(OB_INVALID_ID), - unit_mgr_(nullptr), unit_set_() {} int init( const uint64_t tenant_id, - DRLSInfo &dr_ls_info, - ObUnitManager *unit_mgr); + DRLSInfo &dr_ls_info); int allocate_unit( const common::ObZone &zone, const uint64_t unit_group_id, - share::ObUnitInfo &unit_info); + share::ObUnit &unit); int init_unit_set( DRLSInfo &dr_ls_info); private: int inner_get_valid_unit_( const common::ObZone &zone, - const common::ObArray &unit_array, - share::ObUnitInfo &output_unit_info, + const common::ObArray &unit_array, + share::ObUnit &output_unit, const bool &force_get, bool &found); private: bool inited_; uint64_t tenant_id_; - ObUnitManager *unit_mgr_; + share::ObUnitTableOperator unit_operator_; common::hash::ObHashSet unit_set_; }; @@ -552,7 +548,7 @@ private: class LocalityAlignment { public: - LocalityAlignment(ObUnitManager *unit_mgr, ObZoneManager *zone_mgr, DRLSInfo &dr_ls_info); + LocalityAlignment(ObZoneManager *zone_mgr, DRLSInfo &dr_ls_info); virtual ~LocalityAlignment(); int build(); int get_next_locality_alignment_task( @@ -645,7 +641,6 @@ private: private: int64_t task_idx_; AddReplicaLATask add_replica_task_; - ObUnitManager *unit_mgr_; ObZoneManager *zone_mgr_; DRLSInfo &dr_ls_info_; common::ObArray task_array_; @@ -661,7 +656,6 @@ private: static int check_ls_locality_match_( DRLSInfo &dr_ls_info, - ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, bool &locality_is_matched); @@ -1058,7 +1052,6 @@ private: bool dr_task_mgr_is_loaded_; common::ObAddr self_addr_; common::ObServerConfig *config_; - ObUnitManager *unit_mgr_; ObZoneManager *zone_mgr_; ObDRTaskMgr *disaster_recovery_task_mgr_; share::ObLSTableOperator *lst_operator_; diff --git a/src/rootserver/ob_migrate_unit_finish_checker.cpp b/src/rootserver/ob_migrate_unit_finish_checker.cpp index e90d887827..20ff0bd7ff 100644 --- a/src/rootserver/ob_migrate_unit_finish_checker.cpp +++ b/src/rootserver/ob_migrate_unit_finish_checker.cpp @@ -171,7 +171,6 @@ int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_by_tenant( } else { LOG_INFO("try check migrate unit finish by tenant", K(tenant_id)); DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id), - unit_mgr_, zone_mgr_, schema_service_); ObLSStatusInfoArray ls_status_info_array; @@ -252,11 +251,11 @@ int ObMigrateUnitFinishChecker::statistic_migrate_unit_by_ls( KP(server_stat_info), KP(unit_stat_info), KP(unit_in_group_stat_info)); - } else if (server_stat_info->get_server() != unit_stat_info->get_unit_info().unit_.server_ + } else if (server_stat_info->get_server() != unit_stat_info->get_unit().server_ && (ls_replica->is_in_service() || ls_status_info.ls_is_creating())) { unit_stat_info->inc_outside_replica_cnt(); if (unit_stat_info->get_outside_replica_cnt() <= 2) { // print the first two outside replica - LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit_info().unit_); + LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit()); } } } @@ -280,12 +279,12 @@ int ObMigrateUnitFinishChecker::try_finish_migrate_unit( for (; OB_SUCC(ret) && iter != inner_hash_table.end(); ++iter) { const DRUnitStatInfo &unit_stat_info = iter->v_; if (unit_stat_info.is_in_pool() - && unit_stat_info.get_unit_info().unit_.migrate_from_server_.is_valid() + && unit_stat_info.get_unit().migrate_from_server_.is_valid() && 0 == unit_stat_info.get_outside_replica_cnt()) { if (OB_FAIL(unit_mgr_->finish_migrate_unit( - unit_stat_info.get_unit_info().unit_.unit_id_))) { + unit_stat_info.get_unit().unit_id_))) { LOG_WARN("fail to set unit migrate finish", KR(ret), - "unit_id", unit_stat_info.get_unit_info().unit_.unit_id_); + "unit_id", unit_stat_info.get_unit().unit_id_); } } } diff --git a/src/rootserver/ob_root_balancer.cpp b/src/rootserver/ob_root_balancer.cpp index 0bdfbbb8a1..5688ebe083 100644 --- a/src/rootserver/ob_root_balancer.cpp +++ b/src/rootserver/ob_root_balancer.cpp @@ -79,7 +79,7 @@ int ObRootBalancer::init(common::ObServerConfig &cfg, } else if (OB_FAIL(create(root_balancer_thread_cnt, "RootBalance"))) { LOG_WARN("create root balancer thread failed", K(ret), K(root_balancer_thread_cnt)); } else if (OB_FAIL(disaster_recovery_worker_.init( - self_addr, cfg, unit_mgr, zone_mgr, + self_addr, cfg, zone_mgr, dr_task_mgr, *GCTX.lst_operator_, schema_service, rpc_proxy, sql_proxy))) { LOG_WARN("fail to init disaster recovery worker", KR(ret)); } else if (OB_FAIL(rootservice_util_checker_.init( diff --git a/src/rootserver/ob_root_minor_freeze.cpp b/src/rootserver/ob_root_minor_freeze.cpp index b8a119473c..5921d17156 100644 --- a/src/rootserver/ob_root_minor_freeze.cpp +++ b/src/rootserver/ob_root_minor_freeze.cpp @@ -111,46 +111,6 @@ bool ObRootMinorFreeze::is_server_alive(const ObAddr &server) const return is_alive; } -int ObRootMinorFreeze::get_tenant_server_list(uint64_t tenant_id, - ObIArray &target_server_list) const -{ - int ret = OB_SUCCESS; - - target_server_list.reset(); - ObSEArray pool_ids; - if (OB_FAIL(unit_manager_->get_pool_ids_of_tenant(tenant_id, pool_ids))) { - LOG_WARN("fail to get pool ids of tenant", K(tenant_id), K(ret)); - } else { - ObSEArray units; - - for (int i = 0; OB_SUCC(ret) && i < pool_ids.count(); ++i) { - units.reset(); - if (OB_FAIL(unit_manager_->get_unit_infos_of_pool(pool_ids.at(i), units))) { - LOG_WARN("fail to get unit infos of pool", K(pool_ids.at(i)), K(ret)); - } else { - for (int j = 0; j < units.count(); ++j) { - if (OB_LIKELY(units.at(j).is_valid())) { - const share::ObUnit &unit = units.at(j).unit_; - if (is_server_alive(unit.migrate_from_server_)) { - if (OB_FAIL(target_server_list.push_back(unit.migrate_from_server_))) { - LOG_WARN("fail to push server, ", K(ret)); - } - } - - if (is_server_alive(unit.server_)) { - if (OB_FAIL(target_server_list.push_back(unit.server_))) { - LOG_WARN("fail to push server, ", K(ret)); - } - } - } - } - } - } - } - - return ret; -} - int ObRootMinorFreeze::try_minor_freeze(const obrpc::ObRootMinorFreezeArg &arg) const { int ret = OB_SUCCESS; @@ -334,7 +294,10 @@ int ObRootMinorFreeze::init_params_by_tenant(const ObIArray &tenant_id } } else { // TODO: filter servers according to tenant_id - if (OB_FAIL(get_tenant_server_list(tenant_ids.at(i), target_server_list))) { + if (OB_ISNULL(unit_manager_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unit_manager_ is null", KR(ret), KP(unit_manager_)); + } else if (OB_FAIL(unit_manager_->get_tenant_alive_servers_non_block(tenant_ids.at(i), target_server_list))) { LOG_WARN("fail to get tenant server list, ", K(ret)); } else { bool server_in_zone = false; diff --git a/src/rootserver/ob_root_minor_freeze.h b/src/rootserver/ob_root_minor_freeze.h index 7934b396d9..d445bf95d3 100644 --- a/src/rootserver/ob_root_minor_freeze.h +++ b/src/rootserver/ob_root_minor_freeze.h @@ -97,8 +97,6 @@ private: int check_cancel() const; bool is_server_alive(const common::ObAddr &server) const; - int get_tenant_server_list(uint64_t tenant_id, - common::ObIArray &target_server_list) const; bool inited_; diff --git a/src/rootserver/ob_rootservice_util_checker.cpp b/src/rootserver/ob_rootservice_util_checker.cpp index 50e81c5016..a7545783c9 100644 --- a/src/rootserver/ob_rootservice_util_checker.cpp +++ b/src/rootserver/ob_rootservice_util_checker.cpp @@ -54,7 +54,6 @@ int ObRootServiceUtilChecker::init( schema_service, common_rpc_proxy, self, - unit_mgr, zone_mgr, sql_proxy, lst_operator))) { diff --git a/src/share/ob_unit_table_operator.cpp b/src/share/ob_unit_table_operator.cpp index e3b0c90b12..7eb1a17e00 100644 --- a/src/share/ob_unit_table_operator.cpp +++ b/src/share/ob_unit_table_operator.cpp @@ -743,6 +743,44 @@ int ObUnitTableOperator::get_units_by_unit_group_id( return ret; } + +int ObUnitTableOperator::get_unit_in_group( + const uint64_t unit_group_id, + const common::ObZone &zone, + share::ObUnit &unit) +{ + int ret = OB_SUCCESS; + common::ObArray unit_array; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(zone.is_empty() || OB_INVALID_ID == unit_group_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(zone), K(unit_group_id)); + } else if (OB_FAIL(get_units_by_unit_group_id(unit_group_id, unit_array))) { + LOG_WARN("fail to get unit group", KR(ret), K(unit_group_id)); + } else { + bool found = false; + for (int64_t i = 0; !found && OB_SUCC(ret) && i < unit_array.count(); ++i) { + const share::ObUnit &this_unit = unit_array.at(i); + if (this_unit.zone_ != zone) { + // bypass + } else if (OB_FAIL(unit.assign(this_unit))) { + LOG_WARN("fail to assign unit info", KR(ret)); + } else { + found = true; + } + } + if (OB_FAIL(ret)) { + // failed + } else if (!found) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("unit not found", KR(ret), K(unit_group_id), K(zone)); + } else {} // good + } + return ret; +} + int ObUnitTableOperator::get_units_by_resource_pools( const ObIArray &pools, common::ObIArray &units) diff --git a/src/share/ob_unit_table_operator.h b/src/share/ob_unit_table_operator.h index e43d59e731..fd77ef6bd3 100644 --- a/src/share/ob_unit_table_operator.h +++ b/src/share/ob_unit_table_operator.h @@ -79,6 +79,16 @@ public: int get_units_by_unit_group_id(const uint64_t unit_group_id, common::ObIArray &units); + + // get unit in specific unit_group and zone, if such a unit does not exist, + // then return ret == OB_ENTRY_NOT_EXIST + // @param [in] unit_group_id, target unit_group_id + // @param [in] zone, target zone + // @param [out] unit, unit in specific unit_group_id and zone + int get_unit_in_group(const uint64_t unit_group_id, + const common::ObZone &zone, + share::ObUnit &unit); + int get_units_by_resource_pools(const ObIArray &pools, common::ObIArray &units); int get_units_by_tenant(const uint64_t tenant_id, diff --git a/src/share/unit/ob_unit_info.cpp b/src/share/unit/ob_unit_info.cpp index 5f48242c8c..845f888440 100644 --- a/src/share/unit/ob_unit_info.cpp +++ b/src/share/unit/ob_unit_info.cpp @@ -58,6 +58,26 @@ void ObUnit::reset() replica_type_ = REPLICA_TYPE_FULL; } +int ObUnit::assign(const ObUnit& that) +{ + int ret = OB_SUCCESS; + if (this == &that) { + //skip + } else if (OB_FAIL(zone_.assign(that.zone_))) { + LOG_WARN("zone_ assign failed", KR(ret), K(that.zone_)); + } else { + unit_id_ = that.unit_id_; + resource_pool_id_ = that.resource_pool_id_; + unit_group_id_ = that.unit_group_id_; + server_ = that.server_; + migrate_from_server_ = that.migrate_from_server_; + is_manual_migrate_ = that.is_manual_migrate_; + status_ = that.status_; + replica_type_ = that.replica_type_; + } + return ret; +} + bool ObUnit::is_valid() const { // it's ok for migrate_from_server to be invalid diff --git a/src/share/unit/ob_unit_info.h b/src/share/unit/ob_unit_info.h index b2bf0376b1..e630a04c1f 100644 --- a/src/share/unit/ob_unit_info.h +++ b/src/share/unit/ob_unit_info.h @@ -42,6 +42,7 @@ public: ObUnit(); ~ObUnit() {} inline bool operator <(const ObUnit &unit) const; + int assign(const ObUnit& that); void reset(); bool is_valid() const; bool is_manual_migrate() const { return is_manual_migrate_; }