modify two methods of unit_mgr, use inner table to get information

This commit is contained in:
obdev
2024-02-04 06:12:39 +00:00
committed by ob-robot
parent 2f0ea3c372
commit 5e08668e62
15 changed files with 221 additions and 223 deletions

View File

@ -34,7 +34,6 @@ ObAlterLocalityFinishChecker::ObAlterLocalityFinishChecker(volatile bool &stop)
schema_service_(NULL),
common_rpc_proxy_(NULL),
self_(),
unit_mgr_(NULL),
zone_mgr_(NULL),
sql_proxy_(NULL),
stop_(stop)
@ -49,7 +48,6 @@ int ObAlterLocalityFinishChecker::init(
share::schema::ObMultiVersionSchemaService &schema_service,
obrpc::ObCommonRpcProxy &common_rpc_proxy,
common::ObAddr &addr,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
common::ObMySQLProxy &sql_proxy,
share::ObLSTableOperator &lst_operator)
@ -65,7 +63,6 @@ int ObAlterLocalityFinishChecker::init(
schema_service_ = &schema_service;
common_rpc_proxy_ = &common_rpc_proxy;
self_ = addr;
unit_mgr_ = &unit_mgr;
zone_mgr_ = &zone_mgr;
sql_proxy_ = &sql_proxy;
lst_operator_ = &lst_operator;
@ -86,11 +83,10 @@ int ObAlterLocalityFinishChecker::check()
ret = OB_NOT_INIT;
LOG_WARN("ObAlterLocalityFinishChecker not init", KR(ret));
} else if (OB_ISNULL(schema_service_)
|| OB_ISNULL(unit_mgr_)
|| OB_ISNULL(zone_mgr_)
|| !self_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KP_(schema_service), KP_(unit_mgr), KP_(zone_mgr), K_(self));
LOG_WARN("invalid argument", KR(ret), KP_(schema_service), KP_(zone_mgr), K_(self));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("ObAlterLocalityFinishChecker stopped", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
@ -139,14 +135,12 @@ int ObAlterLocalityFinishChecker::check()
}
} else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match(
tenant_id,
*unit_mgr_,
*zone_mgr_,
alter_locality_finish))){
LOG_WARN("fail to check tenant locality match", KR(tmp_ret), K(tenant_id), K(alter_locality_finish));
} else if (is_user_tenant(tenant_id)
&& OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match(
gen_meta_tenant_id(tenant_id),
*unit_mgr_,
*zone_mgr_,
meta_alter_locality_finish))){
LOG_WARN("fail to check tenant locality match", KR(tmp_ret), "meta_tenant_id",

View File

@ -58,7 +58,6 @@ public:
share::schema::ObMultiVersionSchemaService &schema_service,
obrpc::ObCommonRpcProxy &common_rpc_proxy,
common::ObAddr &self,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
common::ObMySQLProxy &sql_proxy,
share::ObLSTableOperator &lst_operator);
@ -74,7 +73,6 @@ private:
share::schema::ObMultiVersionSchemaService *schema_service_;
obrpc::ObCommonRpcProxy *common_rpc_proxy_; //use GCTX.rs_rpc_proxy_
common::ObAddr self_;
ObUnitManager *unit_mgr_;
ObZoneManager *zone_mgr_;
common::ObMySQLProxy *sql_proxy_;
share::ObLSTableOperator *lst_operator_;

View File

@ -63,21 +63,21 @@ int DRServerStatInfo::assign(
int DRUnitStatInfo::init(
const uint64_t unit_id,
const bool in_pool,
const share::ObUnitInfo &unit_info,
const share::ObUnit &unit,
DRServerStatInfo *server_stat,
const int64_t outside_replica_cnt)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == unit_id
|| !unit_info.is_valid()
|| !unit.is_valid()
|| nullptr == server_stat)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(unit_id), K(unit_info), KP(server_stat));
LOG_WARN("invalid argument", KR(ret), K(unit_id), K(unit), KP(server_stat));
} else {
unit_id_ = unit_id;
in_pool_ = in_pool;
if (OB_FAIL(unit_info_.assign(unit_info))) {
LOG_WARN("fail to assign", KR(ret));
if (OB_FAIL(unit_.assign(unit))) {
LOG_WARN("fail to assign", KR(ret), K(unit));
} else {
server_stat_ = server_stat;
outside_replica_cnt_ = outside_replica_cnt;
@ -90,11 +90,11 @@ int DRUnitStatInfo::assign(
const DRUnitStatInfo &that)
{
int ret = OB_SUCCESS;
if (OB_FAIL(this->unit_.assign(that.unit_))) {
LOG_WARN("fail to assign unit", KR(ret), K(that.unit_));
} else {
this->unit_id_ = that.unit_id_;
this->in_pool_ = that.in_pool_;
if (OB_FAIL(this->unit_info_.assign(that.unit_info_))) {
LOG_WARN("fail to assign", KR(ret));
} else {
this->server_stat_ = that.server_stat_;
this->outside_replica_cnt_ = that.outside_replica_cnt_;
}
@ -113,6 +113,11 @@ int DRLSInfo::init()
} else if (OB_UNLIKELY(nullptr == schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_));
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(unit_operator_.init(*GCTX.sql_proxy_))) {
LOG_WARN("unit operator init failed", KR(ret));
} else if (OB_FAIL(unit_stat_info_map_.init(
UNIT_MAP_BUCKET_NUM))) {
LOG_WARN("fail to init unit stat info map", KR(ret));
@ -314,32 +319,29 @@ int DRLSInfo::fill_servers()
int DRLSInfo::fill_units()
{
int ret = OB_SUCCESS;
common::ObArray<share::ObUnitInfo> unit_info_array;
if (OB_UNLIKELY(nullptr == unit_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_));
} else if (OB_UNLIKELY(OB_INVALID_ID == resource_tenant_id_)) {
common::ObArray<share::ObUnit> unit_array;
if (OB_UNLIKELY(OB_INVALID_ID == resource_tenant_id_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(resource_tenant_id_));
} else if (OB_FAIL(unit_mgr_->get_all_unit_infos_by_tenant(
resource_tenant_id_, unit_info_array))) {
} else if (OB_FAIL(unit_operator_.get_units_by_tenant(
resource_tenant_id_, unit_array))) {
LOG_WARN("fail to get all unit infos by tenant", KR(ret), K(resource_tenant_id_));
} else {
FOREACH_X(u, unit_info_array, OB_SUCC(ret)) {
FOREACH_X(u, unit_array, OB_SUCC(ret)) {
UnitStatInfoMap::Item *item = nullptr;
ServerStatInfoMap::Item *server_item = nullptr;
if (OB_UNLIKELY(nullptr == u)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("u ptr is null", KR(ret));
} else if (OB_FAIL(unit_stat_info_map_.locate(u->unit_.unit_id_, item))) {
LOG_WARN("fail to locate unit", KR(ret), "unit_id", u->unit_.unit_id_);
} else if (OB_FAIL(server_stat_info_map_.locate(u->unit_.server_, server_item))) {
LOG_WARN("fail to locate server", KR(ret), "server", u->unit_.server_);
} else if (OB_FAIL(unit_stat_info_map_.locate(u->unit_id_, item))) {
LOG_WARN("fail to locate unit", KR(ret), "unit_id", u->unit_id_);
} else if (OB_FAIL(server_stat_info_map_.locate(u->server_, server_item))) {
LOG_WARN("fail to locate server", KR(ret), "server", u->server_);
} else if (OB_UNLIKELY(nullptr == item || nullptr == server_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("item ptr is null", KR(ret));
} else if (OB_FAIL(item->v_.init(
u->unit_.unit_id_,
u->unit_id_,
true, /*in pool*/
*u,
&server_item->v_,
@ -445,9 +447,9 @@ int DRLSInfo::build_disaster_ls_info(
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("DRWorker not init", KR(ret));
} else if (OB_ISNULL(schema_service_) || OB_ISNULL(unit_mgr_)) {
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_), KP(unit_mgr_));
LOG_WARN("schema service ptr is null", KR(ret), KP(schema_service_));
} else if (resource_tenant_id_ != gen_user_tenant_id(ls_info.get_tenant_id())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant id not match", KR(ret), K(resource_tenant_id_),
@ -479,37 +481,36 @@ int DRLSInfo::build_disaster_ls_info(
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < inner_ls_info_.get_replicas().count(); ++i) {
ServerStatInfoMap::Item *server = nullptr;
UnitStatInfoMap::Item *unit = nullptr;
UnitStatInfoMap::Item *unit_in_map = nullptr;
UnitStatInfoMap::Item *unit_in_group = nullptr;
share::ObUnitInfo unit_info;
share::ObUnit unit;
share::ObLSReplica &ls_replica = inner_ls_info_.get_replicas().at(i);
if (!ls_replica.get_in_member_list() && !ls_replica.get_in_learner_list()) {
LOG_INFO("replica is neither in member list nor in learner list", K(ls_replica));
} else if (OB_FAIL(server_stat_info_map_.locate(ls_replica.get_server(), server))) {
LOG_WARN("fail to locate server", KR(ret), "server", ls_replica.get_server());
} else if (OB_FAIL(unit_stat_info_map_.locate(ls_replica.get_unit_id(), unit))) {
} else if (OB_FAIL(unit_stat_info_map_.locate(ls_replica.get_unit_id(), unit_in_map))) {
LOG_WARN("fail to locate unit", KR(ret), "unit_id", ls_replica.get_unit_id());
} else {
if (0 == ls_status_info.unit_group_id_) {
unit_in_group = unit;
} else if (OB_FAIL(unit_mgr_->get_unit_in_group(
ls_replica.get_tenant_id(),
unit_in_group = unit_in_map;
} else if (OB_FAIL(unit_operator_.get_unit_in_group(
ls_status_info.unit_group_id_,
ls_replica.get_zone(),
unit_info))) {
unit))) {
LOG_WARN("fail to get unit in group", KR(ret), K(ls_replica));
} else if (OB_FAIL(unit_stat_info_map_.locate(unit_info.unit_.unit_id_, unit_in_group))) {
LOG_WARN("fail to locate unit", KR(ret), "unit_id", unit_info.unit_.unit_id_);
} else if (OB_FAIL(unit_stat_info_map_.locate(unit.unit_id_, unit_in_group))) {
LOG_WARN("fail to locate unit", KR(ret), "unit_id", unit.unit_id_);
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(server) || OB_ISNULL(unit) || OB_ISNULL(unit_in_group)) {
if (OB_ISNULL(server) || OB_ISNULL(unit_in_map) || OB_ISNULL(unit_in_group)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit or server ptr is null", KR(ret), KP(server), KP(unit), K(ls_replica));
LOG_WARN("unit or server ptr is null", KR(ret), KP(server), KP(unit_in_map), K(ls_replica));
} else if (OB_FAIL(append_replica_server_unit_stat(
&server->v_, &unit->v_, &unit_in_group->v_))) {
&server->v_, &unit_in_map->v_, &unit_in_group->v_))) {
LOG_WARN("fail to append replica server/unit stat", KR(ret),
"server_stat_info", server->v_, "unit_stat_info", unit->v_);
"server_stat_info", server->v_, "unit_stat_info", unit_in_map->v_);
}
}
}

View File

@ -91,7 +91,7 @@ struct DRUnitStatInfo
public:
DRUnitStatInfo() : unit_id_(common::OB_INVALID_ID),
in_pool_(false),
unit_info_(),
unit_(),
server_stat_(nullptr),
outside_replica_cnt_(0) {}
public:
@ -101,7 +101,7 @@ public:
TO_STRING_KV(K_(unit_id),
K_(in_pool),
K_(unit_info),
K_(unit),
KPC_(server_stat));
int assign(
@ -110,20 +110,20 @@ public:
int init(
const uint64_t unit_id,
const bool in_pool,
const share::ObUnitInfo &unit_info,
const share::ObUnit &unit,
DRServerStatInfo *server_stat,
const int64_t outside_replica_cnt);
public:
uint64_t get_unit_id() const { return unit_id_; }
bool is_in_pool() const { return in_pool_; }
const share::ObUnitInfo &get_unit_info() const { return unit_info_; }
const share::ObUnit &get_unit() const { return unit_; }
const DRServerStatInfo *get_server_stat() const { return server_stat_; }
int64_t get_outside_replica_cnt() const { return outside_replica_cnt_; }
void inc_outside_replica_cnt() { ++outside_replica_cnt_; }
private:
uint64_t unit_id_;
bool in_pool_;
share::ObUnitInfo unit_info_;
share::ObUnit unit_;
DRServerStatInfo *server_stat_;
int64_t outside_replica_cnt_;
};
@ -139,12 +139,10 @@ class DRLSInfo
{
public:
DRLSInfo(const uint64_t resource_tenant_id,
ObUnitManager *unit_mgr,
ObZoneManager *zone_mgr,
share::schema::ObMultiVersionSchemaService *schema_service)
: resource_tenant_id_(resource_tenant_id),
sys_schema_guard_(),
unit_mgr_(unit_mgr),
zone_mgr_(zone_mgr),
schema_service_(schema_service),
unit_stat_info_map_("DRUnitStatMap"),
@ -163,7 +161,7 @@ public:
inited_(false) {}
virtual ~DRLSInfo() {}
public:
// use user_tenant_id to init unit_info and locality
// use user_tenant_id to init unit and locality
int init();
int build_disaster_ls_info(
const share::ObLSInfo &ls_info,
@ -241,7 +239,7 @@ private:
private:
uint64_t resource_tenant_id_;
share::schema::ObSchemaGetterGuard sys_schema_guard_;
ObUnitManager *unit_mgr_;
share::ObUnitTableOperator unit_operator_;
ObZoneManager *zone_mgr_;
share::schema::ObMultiVersionSchemaService *schema_service_;
UnitStatInfoMap unit_stat_info_map_;

View File

@ -221,12 +221,10 @@ int ObDRWorker::LocalityAlignment::locate_zone_locality(
return ret;
}
ObDRWorker::LocalityAlignment::LocalityAlignment(ObUnitManager *unit_mgr,
ObZoneManager *zone_mgr,
ObDRWorker::LocalityAlignment::LocalityAlignment(ObZoneManager *zone_mgr,
DRLSInfo &dr_ls_info)
: task_idx_(0),
add_replica_task_(),
unit_mgr_(unit_mgr),
zone_mgr_(zone_mgr),
dr_ls_info_(dr_ls_info),
task_array_(),
@ -274,10 +272,7 @@ int ObDRWorker::LocalityAlignment::build_locality_stat_map()
int ret = OB_SUCCESS;
uint64_t tenant_id = OB_INVALID_ID;
share::ObLSID ls_id;
if (OB_ISNULL(unit_mgr_)) {
ret = OB_NOT_INIT;
LOG_WARN("LocalityAlignment not init", KR(ret), KP(unit_mgr_));
} else if (OB_FAIL(dr_ls_info_.get_ls_id(tenant_id, ls_id))) {
if (OB_FAIL(dr_ls_info_.get_ls_id(tenant_id, ls_id))) {
LOG_WARN("fail to get ls id", KR(ret));
} else {
LOG_INFO("build ls locality stat map", K(tenant_id), K(ls_id));
@ -464,8 +459,8 @@ int ObDRWorker::LocalityAlignment::try_remove_match(
LOG_WARN("zone replica desc ptr is null", KR(ret), K(zone));
} else {
bool has_correct_dest_replica = false;
if (replica->get_server() != unit_stat_info->get_unit_info().unit_.server_
|| !unit_stat_info->get_unit_info().unit_.is_active_status()) {
if (replica->get_server() != unit_stat_info->get_unit().server_
|| !unit_stat_info->get_unit().is_active_status()) {
// this replica is migrating or unit is deleting, check whether has a correct dest replica
LOG_TRACE("try to check whether has dest replica", KPC(replica), KPC(unit_stat_info));
const int64_t map_count = replica_stat_map_.count();
@ -482,9 +477,9 @@ int ObDRWorker::LocalityAlignment::try_remove_match(
} else if (replica->is_in_service()
&& replica_stat_desc_to_compare.replica_->get_replica_type() == replica->get_replica_type()
&& replica_stat_desc_to_compare.replica_->get_server() != replica->get_server()
&& (replica_stat_desc_to_compare.replica_->get_server() == unit_stat_info->get_unit_info().unit_.server_
|| replica_stat_desc_to_compare.replica_->get_server() == replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.server_)
&& replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.is_active_status()) {
&& (replica_stat_desc_to_compare.replica_->get_server() == unit_stat_info->get_unit().server_
|| replica_stat_desc_to_compare.replica_->get_server() == replica_stat_desc_to_compare.unit_stat_info_->get_unit().server_)
&& replica_stat_desc_to_compare.unit_stat_info_->get_unit().is_active_status()) {
// A replica is a correct dest replica if these conditions above all satisfied
// (1) replica is in member_list(learner_lsit)
// (2) replica type is expected
@ -499,9 +494,9 @@ int ObDRWorker::LocalityAlignment::try_remove_match(
"replica_type", replica->get_replica_type(),
"server_to_compare", replica_stat_desc_to_compare.replica_->get_server(),
"server", replica->get_server(),
"server_with_unit", unit_stat_info->get_unit_info().unit_.server_,
"server_with_unit_to_compare", replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.server_,
"unit_status_is_active", replica_stat_desc_to_compare.unit_stat_info_->get_unit_info().unit_.is_active_status());
"server_with_unit", unit_stat_info->get_unit().server_,
"server_with_unit_to_compare", replica_stat_desc_to_compare.unit_stat_info_->get_unit().server_,
"unit_status_is_active", replica_stat_desc_to_compare.unit_stat_info_->get_unit().is_active_status());
}
}
}
@ -514,8 +509,8 @@ int ObDRWorker::LocalityAlignment::try_remove_match(
&& replica->get_memstore_percent() == replica_desc.memstore_percent_
&& replica_desc.replica_num_ > 0
&& (!has_correct_dest_replica
|| (unit_stat_info->get_unit_info().unit_.is_active_status()
&& server_stat_info->get_server() == unit_stat_info->get_unit_info().unit_.server_))) {
|| (unit_stat_info->get_unit().is_active_status()
&& server_stat_info->get_server() == unit_stat_info->get_unit().server_))) {
found = true;
if (OB_FAIL(replica_stat_map_.remove(index))) {
LOG_WARN("fail to remove from stat map", KR(ret));
@ -741,14 +736,14 @@ int ObDRWorker::LocalityAlignment::try_generate_type_transform_task_for_readonly
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(replica_stat_desc));
} else {
const share::ObUnitInfo &unit_info = replica_stat_desc.unit_stat_info_->get_unit_info();
const share::ObUnit &unit = replica_stat_desc.unit_stat_info_->get_unit();
bool server_is_active = false;
if (!unit_info.unit_.is_active_status()) {
FLOG_INFO("unit status is not normal, can not generate type transform task", K(unit_info));
} else if (OB_FAIL(SVR_TRACER.check_server_active(unit_info.unit_.server_, server_is_active))) {
LOG_WARN("fail to check server is active", KR(ret), K(unit_info));
if (!unit.is_active_status()) {
FLOG_INFO("unit status is not normal, can not generate type transform task", K(unit));
} else if (OB_FAIL(SVR_TRACER.check_server_active(unit.server_, server_is_active))) {
LOG_WARN("fail to check server is active", KR(ret), K(unit));
} else if (!server_is_active) {
FLOG_INFO("server status is not active, can not generate type transform task", K(unit_info));
FLOG_INFO("server status is not active, can not generate type transform task", K(unit));
} else if (OB_FAIL(generate_type_transform_task(
replica_stat_desc,
replica_desc.replica_type_,
@ -1053,8 +1048,8 @@ int ObDRWorker::LocalityAlignment::generate_type_transform_task(
} else {
task->zone_ = replica->get_zone();
task->dst_server_ = replica->get_server();
task->unit_id_ = unit_stat_info->get_unit_info().unit_.unit_id_;
task->unit_group_id_ = unit_stat_info->get_unit_info().unit_.unit_group_id_;
task->unit_id_ = unit_stat_info->get_unit().unit_id_;
task->unit_group_id_ = unit_stat_info->get_unit().unit_group_id_;
task->src_replica_type_ = replica->get_replica_type();
task->src_memstore_percent_ = replica->get_memstore_percent();
task->src_member_time_us_ = replica->get_member_time_us();
@ -1139,10 +1134,9 @@ int ObDRWorker::LocalityAlignment::build()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = OB_INVALID_ID;
if (OB_UNLIKELY(nullptr == unit_mgr_
|| nullptr == zone_mgr_)) {
if (OB_UNLIKELY(nullptr == zone_mgr_)) {
ret = OB_NOT_INIT;
LOG_WARN("LocalityAlignment not init", KR(ret), KP(unit_mgr_), KP(zone_mgr_));
LOG_WARN("LocalityAlignment not init", KR(ret), KP(zone_mgr_));
} else if (OB_FAIL(locality_map_.create(LOCALITY_MAP_BUCKET_NUM, "LocAlign"))) {
LOG_WARN("fail to create locality map", KR(ret));
} else if (OB_FAIL(generate_paxos_replica_number())) {
@ -1156,7 +1150,7 @@ int ObDRWorker::LocalityAlignment::build()
} else if (OB_FAIL(dr_ls_info_.get_tenant_id(tenant_id))) {
LOG_WARN("fail to get tenant id", KR(ret), K(tenant_id));
} else if (OB_FAIL(unit_provider_.init(gen_user_tenant_id(tenant_id),
dr_ls_info_, unit_mgr_))) {
dr_ls_info_))) {
LOG_WARN("fail to init unit provider", KR(ret), K(tenant_id), K_(dr_ls_info));
}
return ret;
@ -1222,14 +1216,14 @@ int ObDRWorker::LocalityAlignment::try_review_add_replica_task(
LOG_WARN("ls status info ptr is null", KR(ret), KP(ls_status_info));
} else {
found = false;
share::ObUnitInfo unit_info;
share::ObUnit unit;
const common::ObZone &zone = my_task->zone_;
int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit_info);
int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit);
if (OB_ITER_END == tmp_ret) {
// bypass
} else if (OB_SUCCESS == tmp_ret) {
my_task->dst_server_ = unit_info.unit_.server_;
my_task->unit_id_ = unit_info.unit_.unit_id_;
my_task->dst_server_ = unit.server_;
my_task->unit_id_ = unit.unit_id_;
my_task->unit_group_id_ = ls_status_info->unit_group_id_;
if (ObReplicaTypeCheck::is_paxos_replica_V2(my_task->replica_type_)) {
int64_t new_paxos_replica_number = 0;
@ -1457,15 +1451,15 @@ int ObDRWorker::LocalityAlignment::try_get_readonly_all_server_locality_alignmen
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls status info ptr is null", KR(ret), KP(ls_status_info));
} else {
share::ObUnitInfo unit_info;
int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit_info);
share::ObUnit unit;
int tmp_ret = unit_provider.allocate_unit(zone, ls_status_info->unit_group_id_, unit);
if (OB_ITER_END == tmp_ret) {
// bypass
} else if (OB_SUCCESS == tmp_ret) {
add_replica_task_.zone_ = zone;
add_replica_task_.dst_server_ = unit_info.unit_.server_;
add_replica_task_.dst_server_ = unit.server_;
add_replica_task_.member_time_us_ = ObTimeUtility::current_time();
add_replica_task_.unit_id_ = unit_info.unit_.unit_id_;
add_replica_task_.unit_id_ = unit.unit_id_;
add_replica_task_.unit_group_id_ = ls_status_info->unit_group_id_;
add_replica_task_.replica_type_ = REPLICA_TYPE_READONLY;
add_replica_task_.memstore_percent_ = replica_desc_array->readonly_memstore_percent_;
@ -1512,21 +1506,24 @@ int ObDRWorker::LocalityAlignment::get_next_locality_alignment_task(
int ObDRWorker::UnitProvider::init(
const uint64_t tenant_id,
DRLSInfo &dr_ls_info,
ObUnitManager *unit_mgr)
DRLSInfo &dr_ls_info)
{
int ret = OB_SUCCESS;
int64_t replica_cnt = 0;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret));
} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || nullptr == unit_mgr)) {
} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret),
K(tenant_id),
KP(unit_mgr));
K(tenant_id));
} else if (OB_FAIL(dr_ls_info.get_replica_cnt(replica_cnt))) {
LOG_WARN("failed to get replica count", KR(ret));
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_));
} else if (OB_FAIL(unit_operator_.init(*GCTX.sql_proxy_))) {
LOG_WARN("init unit table operator failed", KR(ret));
} else {
const int64_t hash_count = max(replica_cnt, 1);
if (OB_FAIL(unit_set_.create(hash::cal_next_prime(hash_count)))) {
@ -1535,7 +1532,6 @@ int ObDRWorker::UnitProvider::init(
LOG_WARN("failed to init unit set", KR(ret), K(dr_ls_info));
} else {
tenant_id_ = tenant_id;
unit_mgr_ = unit_mgr;
inited_ = true;
}
}
@ -1578,7 +1574,7 @@ int ObDRWorker::UnitProvider::init_unit_set(
} else if ((ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type())
&& ls_replica->get_in_member_list())
|| (!ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type()))) {
if (OB_FAIL(unit_set_.set_refactored(unit_stat_info->get_unit_info().unit_.unit_id_))) {
if (OB_FAIL(unit_set_.set_refactored(unit_stat_info->get_unit().unit_id_))) {
LOG_WARN("fail to set refactored", KR(ret));
}
}
@ -1589,40 +1585,40 @@ int ObDRWorker::UnitProvider::init_unit_set(
int ObDRWorker::UnitProvider::inner_get_valid_unit_(
const common::ObZone &zone,
const common::ObArray<share::ObUnitInfo> &unit_array,
share::ObUnitInfo &output_unit_info,
const common::ObArray<share::ObUnit> &unit_array,
share::ObUnit &output_unit,
const bool &force_get,
bool &found)
{
int ret = OB_SUCCESS;
output_unit_info.reset();
output_unit.reset();
found = false;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_ISNULL(unit_mgr_) || OB_UNLIKELY(0 >= unit_array.count())) {
} else if (OB_UNLIKELY(0 >= unit_array.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("unit mgr ptr is null", KR(ret), KP(unit_mgr_), "unit_count", unit_array.count());
LOG_WARN("unit mgr ptr is null", KR(ret), "unit_count", unit_array.count());
} else {
bool server_is_active = false;
for (int64_t i = 0; OB_SUCC(ret) && i < unit_array.count(); ++i) {
server_is_active = false;
const share::ObUnitInfo &unit_info = unit_array.at(i);
const uint64_t unit_id = unit_info.unit_.unit_id_;
const share::ObUnit &unit = unit_array.at(i);
const uint64_t unit_id = unit.unit_id_;
int hash_ret = OB_SUCCESS;
bool server_and_unit_status_is_valid = true;
if (unit_info.unit_.zone_ != zone) {
if (unit.zone_ != zone) {
// bypass, because we do not support operation between different zones
} else {
if (!force_get) {
if (OB_FAIL(SVR_TRACER.check_server_active(unit_info.unit_.server_, server_is_active))) {
LOG_WARN("fail to check server active", KR(ret), "server", unit_info.unit_.server_);
if (OB_FAIL(SVR_TRACER.check_server_active(unit.server_, server_is_active))) {
LOG_WARN("fail to check server active", KR(ret), "server", unit.server_);
} else if (!server_is_active) {
server_and_unit_status_is_valid = false;
FLOG_INFO("server is not active", "server", unit_info.unit_.server_, K(server_is_active));
} else if (!unit_info.unit_.is_active_status()) {
FLOG_INFO("server is not active", "server", unit.server_, K(server_is_active));
} else if (!unit.is_active_status()) {
server_and_unit_status_is_valid = false;
FLOG_INFO("unit status is not normal", K(unit_info));
FLOG_INFO("unit status is not normal", K(unit));
} else {
server_and_unit_status_is_valid = true;
}
@ -1633,8 +1629,8 @@ int ObDRWorker::UnitProvider::inner_get_valid_unit_(
FLOG_INFO("unit existed", K(unit_id));
} else if (OB_FAIL(hash_ret)) {
LOG_WARN("set refactored failed", KR(ret), KR(hash_ret));
} else if (OB_FAIL(output_unit_info.assign(unit_info))) {
LOG_WARN("fail to assign unit info", KR(ret), K(unit_info));
} else if (OB_FAIL(output_unit.assign(unit))) {
LOG_WARN("fail to assign unit info", KR(ret), K(unit));
} else {
found = true;
break;
@ -1648,31 +1644,31 @@ int ObDRWorker::UnitProvider::inner_get_valid_unit_(
int ObDRWorker::UnitProvider::allocate_unit(
const common::ObZone &zone,
const uint64_t unit_group_id,
share::ObUnitInfo &unit_info)
share::ObUnit &unit)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else {
common::ObArray<ObUnitInfo> unit_array;
common::ObArray<ObUnit> unit_array;
bool found = false;
bool force_get = true; // if unit_group_id is given, just allocate unit belongs to this unit group
// 1. if unit_group_id is valid, try get valid unit in this unit group
if (unit_group_id > 0) {
force_get = true;
if (OB_FAIL(unit_mgr_->get_unit_group(tenant_id_, unit_group_id, unit_array))) {
LOG_WARN("fail to get unit group", KR(ret), K(tenant_id_), K(unit_group_id));
} else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit_info, force_get, found))) {
if (OB_FAIL(unit_operator_.get_units_by_unit_group_id(unit_group_id, unit_array))) {
LOG_WARN("fail to get unit group", KR(ret), K(unit_group_id));
} else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit, force_get, found))) {
LOG_WARN("fail to get valid unit from certain unit group", KR(ret), K(zone), K(unit_array), K(force_get));
}
} else {
// 2. if unit_group_id = 0, try get from all units
unit_array.reset();
force_get = false;
if (OB_FAIL(unit_mgr_->get_all_unit_infos_by_tenant(tenant_id_, unit_array))) {
if (OB_FAIL(unit_operator_.get_units_by_tenant(tenant_id_, unit_array))) {
LOG_WARN("fail to get ll unit infos by tenant", KR(ret), K(tenant_id_));
} else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit_info, force_get, found))) {
} else if (OB_FAIL(inner_get_valid_unit_(zone, unit_array, unit, force_get, found))) {
LOG_WARN("fail to get valid unit from all units in tenant", KR(ret), K(zone), K(unit_array), K(force_get));
}
}
@ -1709,7 +1705,6 @@ ObDRWorker::ObDRWorker(volatile bool &stop)
dr_task_mgr_is_loaded_(false),
self_addr_(),
config_(nullptr),
unit_mgr_(nullptr),
zone_mgr_(nullptr),
disaster_recovery_task_mgr_(nullptr),
lst_operator_(nullptr),
@ -1729,7 +1724,6 @@ ObDRWorker::~ObDRWorker()
int ObDRWorker::init(
common::ObAddr &self_addr,
common::ObServerConfig &config,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
ObDRTaskMgr &task_mgr,
share::ObLSTableOperator &lst_operator,
@ -1747,7 +1741,6 @@ int ObDRWorker::init(
} else {
self_addr_ = self_addr;
config_ = &config;
unit_mgr_ = &unit_mgr;
zone_mgr_ = &zone_mgr;
disaster_recovery_task_mgr_ = &task_mgr;
lst_operator_ = &lst_operator;
@ -1841,7 +1834,6 @@ void ObDRWorker::statistic_total_dr_task(const int64_t task_cnt)
int ObDRWorker::check_tenant_locality_match(
const uint64_t tenant_id,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
bool &locality_is_matched)
{
@ -1868,7 +1860,6 @@ int ObDRWorker::check_tenant_locality_match(
share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i);
bool filter_readonly_replicas_with_flag = true;
DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id),
&unit_mgr,
&zone_mgr,
GCTX.schema_service_);
if (ls_status_info.ls_is_creating()) {
@ -1889,7 +1880,7 @@ int ObDRWorker::check_tenant_locality_match(
LOG_WARN("fail to generate dr log stream info", KR(ret), K(ls_info),
K(ls_status_info), K(filter_readonly_replicas_with_flag));
} else if (OB_FAIL(check_ls_locality_match_(
dr_ls_info, unit_mgr, zone_mgr, locality_is_matched))) {
dr_ls_info, zone_mgr, locality_is_matched))) {
LOG_WARN("fail to try log stream disaster recovery", KR(ret));
}
}
@ -1900,7 +1891,6 @@ int ObDRWorker::check_tenant_locality_match(
int ObDRWorker::check_ls_locality_match_(
DRLSInfo &dr_ls_info,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
bool &locality_is_matched)
{
@ -1908,8 +1898,7 @@ int ObDRWorker::check_ls_locality_match_(
int tmp_ret = OB_SUCCESS;
locality_is_matched = false;
LOG_INFO("start to check ls locality match", K(dr_ls_info));
LocalityAlignment locality_alignment(&unit_mgr,
&zone_mgr,
LocalityAlignment locality_alignment(&zone_mgr,
dr_ls_info);
if (!dr_ls_info.has_leader()) {
LOG_WARN("has no leader, maybe not report yet",
@ -2031,12 +2020,10 @@ int ObDRWorker::try_tenant_disaster_recovery(
share::ObLSStatusInfo &ls_status_info = ls_status_info_array.at(i);
// this structure is used to generate migrtion/locality alignment/shrink unit tasks
DRLSInfo dr_ls_info_without_flag(gen_user_tenant_id(tenant_id),
unit_mgr_,
zone_mgr_,
schema_service_);
// this structure is used to generate permanent offline tasks
DRLSInfo dr_ls_info_with_flag(gen_user_tenant_id(tenant_id),
unit_mgr_,
zone_mgr_,
schema_service_);
int64_t ls_acc_dr_task = 0;
@ -2660,7 +2647,7 @@ int ObDRWorker::check_need_generate_replicate_to_unit(
} else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status()
&& unit_stat_info->is_in_pool()
&& !server_stat_info->is_alive()
&& unit_stat_info->get_unit_info().unit_.server_ != server_stat_info->get_server()
&& unit_stat_info->get_unit().server_ != server_stat_info->get_server()
&& unit_stat_info->get_server_stat()->is_alive()
&& !unit_stat_info->get_server_stat()->is_block()) {
need_generate = true;
@ -2700,8 +2687,8 @@ int ObDRWorker::construct_extra_infos_to_build_migrate_task(
data_size))) {
LOG_WARN("fail to choose disaster recovery data source", KR(ret));
} else if (OB_FAIL(dst_replica.assign(
unit_stat_info.get_unit_info().unit_.unit_id_,
unit_in_group_stat_info.get_unit_info().unit_.unit_group_id_,
unit_stat_info.get_unit().unit_id_,
unit_in_group_stat_info.get_unit().unit_group_id_,
ls_replica.get_zone(),
dst_member))) {
LOG_WARN("fail to assign dst replica", KR(ret));
@ -2803,7 +2790,7 @@ int ObDRWorker::try_replicate_to_unit(
need_generate))) {
LOG_WARN("fail to check need generate replicate to unit task", KR(ret));
} else if (need_generate) {
ObReplicaMember dst_member(unit_stat_info->get_unit_info().unit_.server_,
ObReplicaMember dst_member(unit_stat_info->get_unit().server_,
ObTimeUtility::current_time(),
ls_replica->get_replica_type(),
ls_replica->get_memstore_percent());
@ -2865,11 +2852,11 @@ int ObDRWorker::generate_migrate_ls_task(
if (OB_FAIL(display_info.init(
tenant_id, ls_id, ObDRTaskType::LS_MIGRATE_REPLICA,
ObDRTaskPriority::HIGH_PRI,
unit_stat_info.get_unit_info().unit_.server_,
unit_stat_info.get_unit().server_,
ls_replica.get_replica_type(), old_paxos_replica_number,
ls_replica.get_server(), ls_replica.get_replica_type(),
old_paxos_replica_number,
unit_stat_info.get_unit_info().unit_.server_,
unit_stat_info.get_unit().server_,
task_comment))) {
LOG_WARN("fail to init a ObLSReplicaTaskDisplayInfo", KR(ret));
} else if (OB_FAIL(add_display_info(display_info))) {
@ -3436,7 +3423,7 @@ int ObDRWorker::try_locality_alignment(
int ret = OB_SUCCESS;
DEBUG_SYNC(BEFORE_TRY_LOCALITY_ALIGNMENT);
LOG_INFO("try locality alignment", K(dr_ls_info), K(only_for_display));
LocalityAlignment locality_alignment(unit_mgr_, zone_mgr_, dr_ls_info);
LocalityAlignment locality_alignment(zone_mgr_, dr_ls_info);
const LATask *task = nullptr;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
@ -3526,8 +3513,7 @@ int ObDRWorker::try_shrink_resource_pools(
} else {
ObDRWorker::UnitProvider unit_provider;
const uint64_t tenant_id = ls_status_info->tenant_id_;
if (OB_FAIL(unit_provider.init(gen_user_tenant_id(tenant_id), dr_ls_info,
unit_mgr_))) {
if (OB_FAIL(unit_provider.init(gen_user_tenant_id(tenant_id), dr_ls_info))) {
LOG_WARN("fail to init unit provider", KR(ret), K(tenant_id), K(dr_ls_info));
}
for (int64_t index = 0; OB_SUCC(ret) && index < replica_cnt; ++index) {
@ -3558,7 +3544,7 @@ int ObDRWorker::try_shrink_resource_pools(
KP(unit_stat_info),
KP(unit_in_group_stat_info));
} else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status()
&& share::ObUnit::UNIT_STATUS_DELETING == unit_stat_info->get_unit_info().unit_.status_) {
&& share::ObUnit::UNIT_STATUS_DELETING == unit_stat_info->get_unit().status_) {
// replica is still in member_list, but unit is in DELETING status
// If this is a duplicate log stream
// 1.1 for R-replica: execute remove_learner task directly
@ -3612,7 +3598,7 @@ int ObDRWorker::try_shrink_resource_pools(
} else {
// generate task for normal log stream replica
if (0 == ls_status_info->ls_group_id_
|| ls_status_info->unit_group_id_ != unit_stat_info->get_unit_info().unit_.unit_group_id_) {
|| ls_status_info->unit_group_id_ != unit_stat_info->get_unit().unit_group_id_) {
//If the Unit Group is in the DELETING status, we need to migrate out the LS that do not belong to that Unit Group.
//LS belonging to this Unit Group will be automatically processed by Balance module
// 2.1 try generate and execute migrate replica for normal log stream
@ -3767,7 +3753,7 @@ int ObDRWorker::try_migrate_replica_for_deleting_unit_(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(ls_replica), K(ls_status_info));
} else {
share::ObUnitInfo dest_unit;
share::ObUnit dest_unit;
if (OB_FAIL(unit_provider.allocate_unit(
ls_replica.get_zone(),
ls_status_info.unit_group_id_,
@ -3782,7 +3768,7 @@ int ObDRWorker::try_migrate_replica_for_deleting_unit_(
}
} else {
ObReplicaMember dst_member(
dest_unit.unit_.server_,
dest_unit.server_,
ObTimeUtility::current_time(),
ls_replica.get_replica_type(),
ls_replica.get_memstore_percent());
@ -3977,8 +3963,8 @@ int ObDRWorker::find_valid_readonly_replica_(
if (OB_FAIL(target_replica.assign(*replica))) {
LOG_WARN("fail to assign replica", KR(ret), KPC(replica));
} else {
unit_id = unit_stat_info->get_unit_info().unit_.unit_id_;
unit_group_id = unit_stat_info->get_unit_info().unit_.unit_group_id_;
unit_id = unit_stat_info->get_unit().unit_id_;
unit_group_id = unit_stat_info->get_unit().unit_group_id_;
find_a_valid_readonly_replica = true;
LOG_INFO("find a valid readonly replica to do type transform", K(dr_ls_info),
K(exclude_replica), K(target_zone), K(target_replica));
@ -4134,7 +4120,7 @@ int ObDRWorker::check_need_generate_cancel_unit_migration_task(
KP(unit_stat_info),
KP(unit_in_group_stat_info));
} else if (unit_stat_info->is_in_pool()
&& unit_stat_info->get_unit_info().unit_.migrate_from_server_.is_valid()
&& unit_stat_info->get_unit().migrate_from_server_.is_valid()
&& unit_stat_info->get_server_stat()->is_block()
&& ls_replica->get_server() == unit_stat_info->get_server_stat()->get_server()) {
if (ObReplicaTypeCheck::is_paxos_replica_V2(ls_replica->get_replica_type())
@ -4415,14 +4401,14 @@ int ObDRWorker::check_need_generate_migrate_to_unit_task(
KP(unit_in_group_stat_info));
} else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status()
&& unit_in_group_stat_info->is_in_pool()
&& server_stat_info->get_server() != unit_in_group_stat_info->get_unit_info().unit_.server_
&& server_stat_info->get_server() != unit_in_group_stat_info->get_unit().server_
&& unit_in_group_stat_info->get_server_stat()->is_alive()
&& !unit_in_group_stat_info->get_server_stat()->is_block()) {
need_generate = true;
is_unit_in_group_related = true;
} else if (REPLICA_STATUS_NORMAL == ls_replica->get_replica_status()
&& unit_stat_info->is_in_pool()
&& server_stat_info->get_server() != unit_stat_info->get_unit_info().unit_.server_
&& server_stat_info->get_server() != unit_stat_info->get_unit().server_
&& unit_stat_info->get_server_stat()->is_alive()
&& !unit_stat_info->get_server_stat()->is_block()) {
need_generate = true;
@ -4464,9 +4450,9 @@ int ObDRWorker::construct_extra_infos_for_generate_migrate_to_unit_task(
LOG_WARN("fail to choose disaster recovery data source", KR(ret));
} else if (OB_FAIL(dst_replica.assign(
is_unit_in_group_related
? unit_in_group_stat_info.get_unit_info().unit_.unit_id_
: unit_stat_info.get_unit_info().unit_.unit_id_,
unit_in_group_stat_info.get_unit_info().unit_.unit_group_id_,
? unit_in_group_stat_info.get_unit().unit_id_
: unit_stat_info.get_unit().unit_id_,
unit_in_group_stat_info.get_unit().unit_group_id_,
ls_replica.get_zone(),
dst_member))) {
LOG_WARN("fail to assign dst replica", KR(ret));
@ -4602,8 +4588,8 @@ int ObDRWorker::try_migrate_to_unit(
ls_replica->get_replica_type(),
ls_replica->get_memstore_percent());
ObReplicaMember dst_member(is_unit_in_group_related
? unit_in_group_stat_info->get_unit_info().unit_.server_
: unit_stat_info->get_unit_info().unit_.server_,
? unit_in_group_stat_info->get_unit().server_
: unit_stat_info->get_unit().server_,
ObTimeUtility::current_time(),
ls_replica->get_replica_type(),
ls_replica->get_memstore_percent());
@ -4641,16 +4627,16 @@ int ObDRWorker::try_migrate_to_unit(
ObDRTaskType::LS_MIGRATE_REPLICA,
ObDRTaskPriority::LOW_PRI,
is_unit_in_group_related
? unit_in_group_stat_info->get_unit_info().unit_.server_
: unit_stat_info->get_unit_info().unit_.server_,
? unit_in_group_stat_info->get_unit().server_
: unit_stat_info->get_unit().server_,
ls_replica->get_replica_type(),
old_paxos_replica_number,
ls_replica->get_server(),
ls_replica->get_replica_type(),
old_paxos_replica_number,
is_unit_in_group_related
? unit_in_group_stat_info->get_unit_info().unit_.server_
: unit_stat_info->get_unit_info().unit_.server_,
? unit_in_group_stat_info->get_unit().server_
: unit_stat_info->get_unit().server_,
comment_to_set))) {
LOG_WARN("fail to init a ObLSReplicaTaskDisplayInfo", KR(ret));
} else if (OB_FAIL(add_display_info(display_info))) {

View File

@ -108,7 +108,6 @@ public:
int init(
common::ObAddr &self_addr,
common::ObServerConfig &cfg,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
ObDRTaskMgr &task_mgr,
share::ObLSTableOperator &lst_operator,
@ -122,7 +121,6 @@ public:
int64_t &acc_dr_task);
static int check_tenant_locality_match(
const uint64_t tenant_id,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
bool &locality_is_matched);
@ -516,30 +514,28 @@ private:
UnitProvider()
: inited_(false),
tenant_id_(OB_INVALID_ID),
unit_mgr_(nullptr),
unit_set_() {}
int init(
const uint64_t tenant_id,
DRLSInfo &dr_ls_info,
ObUnitManager *unit_mgr);
DRLSInfo &dr_ls_info);
int allocate_unit(
const common::ObZone &zone,
const uint64_t unit_group_id,
share::ObUnitInfo &unit_info);
share::ObUnit &unit);
int init_unit_set(
DRLSInfo &dr_ls_info);
private:
int inner_get_valid_unit_(
const common::ObZone &zone,
const common::ObArray<share::ObUnitInfo> &unit_array,
share::ObUnitInfo &output_unit_info,
const common::ObArray<share::ObUnit> &unit_array,
share::ObUnit &output_unit,
const bool &force_get,
bool &found);
private:
bool inited_;
uint64_t tenant_id_;
ObUnitManager *unit_mgr_;
share::ObUnitTableOperator unit_operator_;
common::hash::ObHashSet<int64_t> unit_set_;
};
@ -552,7 +548,7 @@ private:
class LocalityAlignment
{
public:
LocalityAlignment(ObUnitManager *unit_mgr, ObZoneManager *zone_mgr, DRLSInfo &dr_ls_info);
LocalityAlignment(ObZoneManager *zone_mgr, DRLSInfo &dr_ls_info);
virtual ~LocalityAlignment();
int build();
int get_next_locality_alignment_task(
@ -645,7 +641,6 @@ private:
private:
int64_t task_idx_;
AddReplicaLATask add_replica_task_;
ObUnitManager *unit_mgr_;
ObZoneManager *zone_mgr_;
DRLSInfo &dr_ls_info_;
common::ObArray<LATask *> task_array_;
@ -661,7 +656,6 @@ private:
static int check_ls_locality_match_(
DRLSInfo &dr_ls_info,
ObUnitManager &unit_mgr,
ObZoneManager &zone_mgr,
bool &locality_is_matched);
@ -1058,7 +1052,6 @@ private:
bool dr_task_mgr_is_loaded_;
common::ObAddr self_addr_;
common::ObServerConfig *config_;
ObUnitManager *unit_mgr_;
ObZoneManager *zone_mgr_;
ObDRTaskMgr *disaster_recovery_task_mgr_;
share::ObLSTableOperator *lst_operator_;

View File

@ -171,7 +171,6 @@ int ObMigrateUnitFinishChecker::try_check_migrate_unit_finish_by_tenant(
} else {
LOG_INFO("try check migrate unit finish by tenant", K(tenant_id));
DRLSInfo dr_ls_info(gen_user_tenant_id(tenant_id),
unit_mgr_,
zone_mgr_,
schema_service_);
ObLSStatusInfoArray ls_status_info_array;
@ -252,11 +251,11 @@ int ObMigrateUnitFinishChecker::statistic_migrate_unit_by_ls(
KP(server_stat_info),
KP(unit_stat_info),
KP(unit_in_group_stat_info));
} else if (server_stat_info->get_server() != unit_stat_info->get_unit_info().unit_.server_
} else if (server_stat_info->get_server() != unit_stat_info->get_unit().server_
&& (ls_replica->is_in_service() || ls_status_info.ls_is_creating())) {
unit_stat_info->inc_outside_replica_cnt();
if (unit_stat_info->get_outside_replica_cnt() <= 2) { // print the first two outside replica
LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit_info().unit_);
LOG_INFO("outside replica", KPC(ls_replica), "unit", unit_stat_info->get_unit());
}
}
}
@ -280,12 +279,12 @@ int ObMigrateUnitFinishChecker::try_finish_migrate_unit(
for (; OB_SUCC(ret) && iter != inner_hash_table.end(); ++iter) {
const DRUnitStatInfo &unit_stat_info = iter->v_;
if (unit_stat_info.is_in_pool()
&& unit_stat_info.get_unit_info().unit_.migrate_from_server_.is_valid()
&& unit_stat_info.get_unit().migrate_from_server_.is_valid()
&& 0 == unit_stat_info.get_outside_replica_cnt()) {
if (OB_FAIL(unit_mgr_->finish_migrate_unit(
unit_stat_info.get_unit_info().unit_.unit_id_))) {
unit_stat_info.get_unit().unit_id_))) {
LOG_WARN("fail to set unit migrate finish", KR(ret),
"unit_id", unit_stat_info.get_unit_info().unit_.unit_id_);
"unit_id", unit_stat_info.get_unit().unit_id_);
}
}
}

View File

@ -79,7 +79,7 @@ int ObRootBalancer::init(common::ObServerConfig &cfg,
} else if (OB_FAIL(create(root_balancer_thread_cnt, "RootBalance"))) {
LOG_WARN("create root balancer thread failed", K(ret), K(root_balancer_thread_cnt));
} else if (OB_FAIL(disaster_recovery_worker_.init(
self_addr, cfg, unit_mgr, zone_mgr,
self_addr, cfg, zone_mgr,
dr_task_mgr, *GCTX.lst_operator_, schema_service, rpc_proxy, sql_proxy))) {
LOG_WARN("fail to init disaster recovery worker", KR(ret));
} else if (OB_FAIL(rootservice_util_checker_.init(

View File

@ -111,46 +111,6 @@ bool ObRootMinorFreeze::is_server_alive(const ObAddr &server) const
return is_alive;
}
int ObRootMinorFreeze::get_tenant_server_list(uint64_t tenant_id,
ObIArray<ObAddr> &target_server_list) const
{
int ret = OB_SUCCESS;
target_server_list.reset();
ObSEArray<uint64_t, 2> pool_ids;
if (OB_FAIL(unit_manager_->get_pool_ids_of_tenant(tenant_id, pool_ids))) {
LOG_WARN("fail to get pool ids of tenant", K(tenant_id), K(ret));
} else {
ObSEArray<share::ObUnitInfo, 4> units;
for (int i = 0; OB_SUCC(ret) && i < pool_ids.count(); ++i) {
units.reset();
if (OB_FAIL(unit_manager_->get_unit_infos_of_pool(pool_ids.at(i), units))) {
LOG_WARN("fail to get unit infos of pool", K(pool_ids.at(i)), K(ret));
} else {
for (int j = 0; j < units.count(); ++j) {
if (OB_LIKELY(units.at(j).is_valid())) {
const share::ObUnit &unit = units.at(j).unit_;
if (is_server_alive(unit.migrate_from_server_)) {
if (OB_FAIL(target_server_list.push_back(unit.migrate_from_server_))) {
LOG_WARN("fail to push server, ", K(ret));
}
}
if (is_server_alive(unit.server_)) {
if (OB_FAIL(target_server_list.push_back(unit.server_))) {
LOG_WARN("fail to push server, ", K(ret));
}
}
}
}
}
}
}
return ret;
}
int ObRootMinorFreeze::try_minor_freeze(const obrpc::ObRootMinorFreezeArg &arg) const
{
int ret = OB_SUCCESS;
@ -334,7 +294,10 @@ int ObRootMinorFreeze::init_params_by_tenant(const ObIArray<uint64_t> &tenant_id
}
} else {
// TODO: filter servers according to tenant_id
if (OB_FAIL(get_tenant_server_list(tenant_ids.at(i), target_server_list))) {
if (OB_ISNULL(unit_manager_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unit_manager_ is null", KR(ret), KP(unit_manager_));
} else if (OB_FAIL(unit_manager_->get_tenant_alive_servers_non_block(tenant_ids.at(i), target_server_list))) {
LOG_WARN("fail to get tenant server list, ", K(ret));
} else {
bool server_in_zone = false;

View File

@ -97,8 +97,6 @@ private:
int check_cancel() const;
bool is_server_alive(const common::ObAddr &server) const;
int get_tenant_server_list(uint64_t tenant_id,
common::ObIArray<common::ObAddr> &target_server_list) const;
bool inited_;

View File

@ -54,7 +54,6 @@ int ObRootServiceUtilChecker::init(
schema_service,
common_rpc_proxy,
self,
unit_mgr,
zone_mgr,
sql_proxy,
lst_operator))) {

View File

@ -743,6 +743,44 @@ int ObUnitTableOperator::get_units_by_unit_group_id(
return ret;
}
int ObUnitTableOperator::get_unit_in_group(
const uint64_t unit_group_id,
const common::ObZone &zone,
share::ObUnit &unit)
{
int ret = OB_SUCCESS;
common::ObArray<share::ObUnit> unit_array;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(zone.is_empty() || OB_INVALID_ID == unit_group_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(zone), K(unit_group_id));
} else if (OB_FAIL(get_units_by_unit_group_id(unit_group_id, unit_array))) {
LOG_WARN("fail to get unit group", KR(ret), K(unit_group_id));
} else {
bool found = false;
for (int64_t i = 0; !found && OB_SUCC(ret) && i < unit_array.count(); ++i) {
const share::ObUnit &this_unit = unit_array.at(i);
if (this_unit.zone_ != zone) {
// bypass
} else if (OB_FAIL(unit.assign(this_unit))) {
LOG_WARN("fail to assign unit info", KR(ret));
} else {
found = true;
}
}
if (OB_FAIL(ret)) {
// failed
} else if (!found) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("unit not found", KR(ret), K(unit_group_id), K(zone));
} else {} // good
}
return ret;
}
int ObUnitTableOperator::get_units_by_resource_pools(
const ObIArray<share::ObResourcePoolName> &pools,
common::ObIArray<ObUnit> &units)

View File

@ -79,6 +79,16 @@ public:
int get_units_by_unit_group_id(const uint64_t unit_group_id,
common::ObIArray<ObUnit> &units);
// get unit in specific unit_group and zone, if such a unit does not exist,
// then return ret == OB_ENTRY_NOT_EXIST
// @param [in] unit_group_id, target unit_group_id
// @param [in] zone, target zone
// @param [out] unit, unit in specific unit_group_id and zone
int get_unit_in_group(const uint64_t unit_group_id,
const common::ObZone &zone,
share::ObUnit &unit);
int get_units_by_resource_pools(const ObIArray<share::ObResourcePoolName> &pools,
common::ObIArray<ObUnit> &units);
int get_units_by_tenant(const uint64_t tenant_id,

View File

@ -58,6 +58,26 @@ void ObUnit::reset()
replica_type_ = REPLICA_TYPE_FULL;
}
int ObUnit::assign(const ObUnit& that)
{
int ret = OB_SUCCESS;
if (this == &that) {
//skip
} else if (OB_FAIL(zone_.assign(that.zone_))) {
LOG_WARN("zone_ assign failed", KR(ret), K(that.zone_));
} else {
unit_id_ = that.unit_id_;
resource_pool_id_ = that.resource_pool_id_;
unit_group_id_ = that.unit_group_id_;
server_ = that.server_;
migrate_from_server_ = that.migrate_from_server_;
is_manual_migrate_ = that.is_manual_migrate_;
status_ = that.status_;
replica_type_ = that.replica_type_;
}
return ret;
}
bool ObUnit::is_valid() const
{
// it's ok for migrate_from_server to be invalid

View File

@ -42,6 +42,7 @@ public:
ObUnit();
~ObUnit() {}
inline bool operator <(const ObUnit &unit) const;
int assign(const ObUnit& that);
void reset();
bool is_valid() const;
bool is_manual_migrate() const { return is_manual_migrate_; }