Decoupling stop server and leader switch

This commit is contained in:
zs0 2021-08-20 16:51:02 +08:00 committed by wangzelin.wzl
parent f7e6f965e6
commit 69608af97a
5 changed files with 368 additions and 208 deletions

View File

@ -2514,6 +2514,75 @@ int ObLeaderCoordinator::is_last_switch_turn_succ(bool& is_succ)
return ret;
}
int ObLeaderCoordinator::init_stop_server_info_set(StopServerInfoSet &stop_server_info_set)
{
int ret = OB_SUCCESS;
common::ObArray<ObAddr> server_lists;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_ISNULL(server_mgr_) || OB_ISNULL(zone_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("server mgr or zone mgr is null", KR(ret));
} else if (OB_FAIL(server_mgr_->get_all_server_list(server_lists))) {
LOG_WARN("fail to get all server list", KR(ret));
} else {
for (int64_t i = 0; i < server_lists.count() && OB_SUCC(ret); ++i) {
const ObAddr &server = server_lists.at(i);
bool zone_active = false;
bool has_stopped_server = false;
ObServerStatus server_status;
if (OB_FAIL(server_mgr_->check_server_stopped(server, has_stopped_server))) {
LOG_WARN("fail to check server stop", KR(ret));
} else if (has_stopped_server) {
if (OB_FAIL(stop_server_info_set.set_refactored(server))) {
LOG_WARN("fail to set stop server info set", KR(ret), K(server));
}
} else if (OB_FAIL(server_mgr_->get_server_status(server, server_status))) {
LOG_WARN("fail to get server status", KR(ret), K(server));
} else if (OB_FAIL(zone_mgr_->check_zone_active(server_status.zone_, zone_active))) {
LOG_WARN("fail to check zone active", KR(ret), K(server));
} else if (!zone_active) {
if (OB_FAIL(stop_server_info_set.set_refactored(server))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to stop server info set", KR(ret), K(server));
}
}
}
}
}
return ret;
}
int ObLeaderCoordinator::build_stop_server_info_set(StopServerInfoSet &stop_server_info_set, bool &skip_switch_leader)
{
int ret = OB_SUCCESS;
const int64_t bucket_num = hash::cal_next_prime(BUCKET_NUM);
bool has_stop_server_or_stop_zone = false;
skip_switch_leader = false;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_ISNULL(config_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config is null", KR(ret));
} else if (OB_FAIL(stop_server_info_set.create(bucket_num))) {
LOG_WARN("fail to create stop server info map", KR(ret), K(bucket_num));
} else if (config_->enable_auto_leader_switch) {
// nothing todo
} else if (OB_FAIL(init_stop_server_info_set(stop_server_info_set))) {
LOG_WARN("fail to build stop server info set", KR(ret));
} else if (0 == stop_server_info_set.size()) {
skip_switch_leader = true;
LOG_INFO("enable_auto_leader_switch is disable and don't have stop server,"
"skip do coordinate");
} else {
}
return ret;
}
int ObLeaderCoordinator::coordinate()
{
int ret = OB_SUCCESS;
@ -2526,21 +2595,25 @@ int ObLeaderCoordinator::coordinate()
ObLatchWGuard guard(lock_, ObLatchIds::LEADER_COORDINATOR_LOCK);
switch_info_stat_.clear_switch_limited();
StopServerInfoSet stop_server_info_set;
bool skip_switch_leader = false;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (!config_->enable_auto_leader_switch) {
LOG_INFO("enable_auto_leader_switch is disable, skip do coordinate");
} else {
if (OB_FAIL(switch_info_stat_.prepare_next_turn(stop_, *server_mgr_, switch_leader_duration_time))) {
LOG_WARN("failed to prepare for new turn", K(ret));
} else if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", K(ret));
} else if (OB_FAIL(coordinate_helper(partition_info_container_, excluded_zones, excluded_servers, tenant_ids))) {
LOG_WARN("coordinate failed", K(ret));
} else if (OB_FAIL(update_leader_stat())) {
LOG_WARN("update_leader_stat failed", K(ret));
}
} else if (OB_FAIL(build_stop_server_info_set(stop_server_info_set, skip_switch_leader))) {
LOG_WARN("fail to build_stop_server_info_set", KR(ret));
} else if (skip_switch_leader) {
// nothing todo
} else if (OB_FAIL(switch_info_stat_.prepare_next_turn(stop_, *server_mgr_, switch_leader_duration_time))) {
LOG_WARN("failed to prepare for new turn", K(ret));
} else if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", K(ret));
} else if (OB_FAIL(coordinate_helper(
partition_info_container_, excluded_zones, excluded_servers, tenant_ids, stop_server_info_set))) {
LOG_WARN("coordinate failed", K(ret));
} else if (OB_FAIL(update_leader_stat())) {
LOG_WARN("update_leader_stat failed", K(ret));
}
return ret;
}
@ -2559,18 +2632,25 @@ int ObLeaderCoordinator::auto_coordinate()
DEBUG_SYNC(BEFORE_AUTO_COORDINATE);
switch_info_stat_.clear_switch_limited();
if (!config_->enable_auto_leader_switch) {
LOG_INFO("enable_auto_leader_switch is disabled, skip auto_coordinate");
} else {
if (OB_FAIL(switch_info_stat_.prepare_next_turn(stop_, *server_mgr_, switch_leader_duration_time))) {
LOG_WARN("failed to prepare for new turn", K(ret));
} else if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", K(ret));
} else if (OB_FAIL(coordinate_helper(partition_info_container_, excluded_zones, excluded_servers, tenant_ids))) {
LOG_WARN("coordinate failed", K(ret));
} else if (OB_FAIL(update_leader_stat())) {
LOG_WARN("update_leader_stat failed", K(ret));
}
StopServerInfoSet stop_server_info_set;
bool skip_switch_leader = false;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(build_stop_server_info_set(stop_server_info_set, skip_switch_leader))) {
LOG_WARN("fail to build stop server info map", KR(ret), K(skip_switch_leader));
} else if (skip_switch_leader) {
// nothing todo
} else if (OB_FAIL(switch_info_stat_.prepare_next_turn(stop_, *server_mgr_, switch_leader_duration_time))) {
LOG_WARN("failed to prepare for new turn", K(ret));
} else if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", K(ret));
} else if (OB_FAIL(coordinate_helper(
partition_info_container_, excluded_zones, excluded_servers, tenant_ids, stop_server_info_set))) {
LOG_WARN("coordinate failed", K(ret));
} else if (OB_FAIL(update_leader_stat())) {
LOG_WARN("update_leader_stat failed", K(ret));
}
if (OB_SUCC(ret)) {
@ -2614,8 +2694,9 @@ int ObLeaderCoordinator::auto_coordinate()
return ret;
}
int ObLeaderCoordinator::coordinate_tenants(const ObArray<ObZone>& excluded_zones,
const ObIArray<ObAddr>& excluded_servers, const ObIArray<uint64_t>& tenant_ids, const bool force)
int ObLeaderCoordinator::coordinate_tenants(const ObArray<ObZone> &excluded_zones,
const ObIArray<ObAddr> &excluded_servers, const ObIArray<uint64_t> &tenant_ids,
const StopServerInfoSet &stop_server_info_set, const bool force)
{
int ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, ObLatchIds::LEADER_COORDINATOR_LOCK);
@ -2631,8 +2712,12 @@ int ObLeaderCoordinator::coordinate_tenants(const ObArray<ObZone>& excluded_zone
PartitionInfoContainer partition_info_container;
if (OB_FAIL(partition_info_container.init(pt_operator_, schema_service_, this))) {
LOG_WARN("fail to init partition info container", K(ret));
} else if (OB_FAIL(
coordinate_helper(partition_info_container, excluded_zones, excluded_servers, tenant_ids, force))) {
} else if (OB_FAIL(coordinate_helper(partition_info_container,
excluded_zones,
excluded_servers,
tenant_ids,
stop_server_info_set,
force))) {
LOG_WARN("coordinate helper failed", K(excluded_zones), K(excluded_servers), K(tenant_ids), K(force), K(ret));
}
}
@ -2646,6 +2731,8 @@ int ObLeaderCoordinator::coordinate_partition_group(const uint64_t partition_ent
bool small_tenant = false;
const uint64_t tenant_id = extract_tenant_id(partition_entity_id);
const int64_t begin = ObTimeUtility::current_time();
const int64_t bucket_num = hash::cal_next_prime(BUCKET_NUM);
StopServerInfoSet stop_server_info_set;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -2658,14 +2745,19 @@ int ObLeaderCoordinator::coordinate_partition_group(const uint64_t partition_ent
} else if (OB_UNLIKELY(NULL == GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice ptr is null", K(ret));
} else if (OB_FAIL(stop_server_info_set.create(bucket_num))) {
LOG_WARN("fail to create stop server info map", KR(ret), K(bucket_num));
} else {
const bool force = true;
const bool has_stop_server = (0 != stop_server_info_set.size());
// whether there is a stop server has no effect
if (small_tenant) {
// coordinate full tenant partitions for small tenant
ObArray<uint64_t> tenant_ids;
if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
LOG_WARN("push tenant_id to array failed", K(ret));
} else if (OB_FAIL(coordinate_tenants(excluded_zones, excluded_servers, tenant_ids, force))) {
} else if (OB_FAIL(
coordinate_tenants(excluded_zones, excluded_servers, tenant_ids, stop_server_info_set, force))) {
LOG_WARN("coordinate tenant failed", K(ret), K(excluded_servers), K(tenant_ids), K(force));
}
} else {
@ -2744,10 +2836,15 @@ int ObLeaderCoordinator::coordinate_partition_group(const uint64_t partition_ent
cursor_container,
excluded_zones,
excluded_servers,
stop_server_info_set,
force))) {
LOG_WARN("fail to build leader switch cursor array", K(ret));
} else if (OB_FAIL(coordinate_partition_arrays(
leader_wait_operator, tenant_partition, tenant_unit, cursor_container, force))) {
} else if (OB_FAIL(coordinate_partition_arrays(leader_wait_operator,
tenant_partition,
tenant_unit,
cursor_container,
has_stop_server,
force))) {
LOG_WARN("coordinate partition arrays failed", K(ret), K(force));
}
}
@ -2839,9 +2936,9 @@ int ObLeaderCoordinator::build_leader_balance_info(share::schema::ObSchemaGetter
return ret;
}
int ObLeaderCoordinator::coordinate_helper(PartitionInfoContainer& partition_info_container,
const ObArray<ObZone>& excluded_zones, const ObIArray<ObAddr>& excluded_servers,
const ObIArray<uint64_t>& tenant_ids, const bool force)
int ObLeaderCoordinator::coordinate_helper(PartitionInfoContainer &partition_info_container,
const ObArray<ObZone> &excluded_zones, const ObIArray<ObAddr> &excluded_servers,
const ObIArray<uint64_t> &tenant_ids, const StopServerInfoSet &stop_server_info_set, const bool force)
{
int ret = OB_SUCCESS;
int bak_ret = OB_SUCCESS;
@ -2861,9 +2958,14 @@ int ObLeaderCoordinator::coordinate_helper(PartitionInfoContainer& partition_inf
} else if (OB_UNLIKELY(NULL == GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("gctx root service ptr is null", K(ret));
} else if (OB_UNLIKELY(!stop_server_info_set.created())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("stop server info map not created", KR(ret));
} else {
ObArenaAllocator allocator(ObModIds::OB_RS_LEADER_COORDINATOR_PA);
ObArray<uint64_t> new_tenant_ids;
const bool has_stop_server_or_zone = (0 != stop_server_info_set.size());
const bool new_force = const_cast<bool &>(force) || has_stop_server_or_zone;
if (OB_FAIL(move_sys_tenant_to_last(tenant_ids, new_tenant_ids))) {
LOG_WARN("move_sys_tenant_to_last failed", K(tenant_ids), K(ret));
} else {
@ -2913,12 +3015,17 @@ int ObLeaderCoordinator::coordinate_helper(PartitionInfoContainer& partition_inf
cursor_container,
excluded_zones,
excluded_servers,
stop_server_info_set,
force))) {
LOG_WARN("fail to build leader switch cursor array", K(ret));
} else if (OB_FAIL(leader_wait_operator.init(*tenant_id))) {
LOG_WARN("fail to init leader wait operator", K(ret));
} else if (OB_FAIL(coordinate_partition_arrays(
leader_wait_operator, tenant_partition, tenant_unit, cursor_container, force))) {
} else if (OB_FAIL(coordinate_partition_arrays(leader_wait_operator,
tenant_partition,
tenant_unit,
cursor_container,
has_stop_server_or_zone,
new_force))) {
LOG_WARN("coordinate partition arrays failed", K(excluded_zones), K(excluded_servers), K(force), K(ret));
} else {
} // no more to do
@ -3461,9 +3568,10 @@ int ObLeaderCoordinator::get_tenant_unit(const uint64_t tenant_id, TenantUnit& t
return ret;
}
int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_id, TenantUnit& tenant_unit,
common::ObArray<PartitionArray*>& partition_arrays, common::ObIArray<PreSwitchPgInfo>& pre_switch_index_array,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers)
int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_id, TenantUnit &tenant_unit,
common::ObArray<PartitionArray *> &partition_arrays, common::ObIArray<PreSwitchPgInfo> &pre_switch_index_array,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
const StopServerInfoSet &stop_server_info_set, const bool force)
{
int ret = OB_SUCCESS;
CandidateLeaderInfoMap leader_info_map(*this);
@ -3474,21 +3582,28 @@ int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_i
} else if (OB_INVALID_ID == tenant_id || partition_arrays.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("partition_arrays is empty", K(ret), K(tenant_id), K(partition_arrays));
} else if (OB_ISNULL(config_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config is null", KR(ret), K(tenant_id));
} else if (OB_FAIL(leader_info_map.create(MAP_BUCKET_NUM, ObModIds::OB_RS_LEADER_COORDINATOR))) {
LOG_WARN("fail to create leader info map", K(ret));
} else {
pre_switch_index_array.reset();
const bool enable_auto_leader_switch = config_->enable_auto_leader_switch;
for (int64_t i = 0; OB_SUCC(ret) && i < partition_arrays.count(); ++i) {
common::ObSEArray<common::ObAddr, 7> candidate_leaders;
PartitionArray* partition_array = partition_arrays.at(i);
bool need_switch = true;
ObPartitionKey pkey;
bool ignore_switch_percent = false;
ObAddr cur_leader;
bool same_leader = false;
bool has_leader_in_stop_server = false;
if (OB_UNLIKELY(NULL == partition_array)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("partition array is null", K(ret), KP(partition_array));
} else if (OB_ISNULL(config_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config is null", KR(ret), K(tenant_id));
} else if (OB_FAIL(check_cancel())) {
LOG_WARN("check cancel failed, maybe rs is stopped", K(ret));
} else if (OB_UNLIKELY(partition_array->count() <= 0)) {
@ -3499,23 +3614,21 @@ int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_i
} else if (!pkey.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pkey is invalid", K(ret), K(pkey));
} else if (OB_FAIL(build_partition_statistic(*partition_array,
tenant_unit,
excluded_zones,
excluded_servers,
leader_info_map,
ignore_switch_percent))) {
} else if (OB_FAIL(build_partition_statistic(
*partition_array, tenant_unit, excluded_zones, excluded_servers, leader_info_map))) {
LOG_WARN("fail to build partition statistic", K(ret));
} else if (OB_FAIL(get_cur_partition_leader(*partition_array, cur_leader, same_leader))) {
} else if (OB_FAIL(get_cur_partition_leader(
*partition_array, cur_leader, same_leader, stop_server_info_set, has_leader_in_stop_server))) {
LOG_WARN("fail to get cur partition leader", K(ret));
} else if (!same_leader) {
ignore_switch_percent = true;
LOG_INFO("leaders not in same server, force switch",
} else if (!same_leader || has_leader_in_stop_server) {
LOG_INFO("leaders not in same server or has leader in stop server, force switch",
K(tenant_id),
"first_pk",
pkey,
"part_id",
pkey.get_partition_id());
} else if (!force && !enable_auto_leader_switch) {
need_switch = false;
} else {
if (OB_FAIL(choose_leader(
tenant_id, partition_array->get_tablegroup_id(), leader_info_map, candidate_leaders, cur_leader))) {
@ -3523,8 +3636,8 @@ int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_i
} else if (candidate_leaders.count() <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("candidate leaders count must not 0", K(ret), K(candidate_leaders.count()));
} else if (has_exist_in_array(candidate_leaders, cur_leader)) {
need_switch = false; // cur leader already in candidate leaders, no need to switch
} else if (same_leader && has_exist_in_array(candidate_leaders, cur_leader)) {
need_switch = false;
}
}
if (OB_FAIL(ret) || !need_switch) {
@ -3542,10 +3655,11 @@ int ObLeaderCoordinator::build_pre_switch_pg_index_array(const uint64_t tenant_i
return ret;
}
int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit& tenant_unit,
const common::ObIArray<PreSwitchPgInfo>& pre_switch_index_array, common::ObArray<PartitionArray*>& partition_arrays,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers,
CursorContainer& cursor_container)
int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit &tenant_unit,
const common::ObIArray<PreSwitchPgInfo> &pre_switch_index_array,
common::ObArray<PartitionArray *> &partition_arrays, const ObZoneList &excluded_zones,
const common::ObIArray<common::ObAddr> &excluded_servers, CursorContainer &cursor_container,
const StopServerInfoSet &stop_server_info_set, const bool force)
{
int ret = OB_SUCCESS;
CandidateLeaderInfoMap leader_info_map(*this);
@ -3556,18 +3670,22 @@ int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t
} else if (OB_INVALID_ID == tenant_id || partition_arrays.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("partition_arrays is empty", K(ret), K(tenant_id), K(partition_arrays));
} else if (OB_ISNULL(config_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config is null", KR(ret), K(tenant_id));
} else if (OB_FAIL(leader_info_map.create(MAP_BUCKET_NUM, ObModIds::OB_RS_LEADER_COORDINATOR))) {
LOG_WARN("fail to create leader info map", K(ret));
} else {
const bool enable_auto_leader_switch = config_->enable_auto_leader_switch;
for (int64_t i = 0; OB_SUCC(ret) && i < pre_switch_index_array.count(); ++i) {
const PreSwitchPgInfo& pg_info = pre_switch_index_array.at(i);
common::ObSEArray<common::ObAddr, 7> candidate_leaders;
PartitionArray* partition_array = nullptr;
bool need_switch = true;
ObPartitionKey pkey;
bool ignore_switch_percent = false;
ObAddr cur_leader;
bool same_leader = false;
bool has_leader_in_stop_server = false;
if (OB_FAIL(check_cancel())) {
LOG_WARN("check cancel failed, maybe rs is stopped", K(ret));
} else if (!pg_info.is_valid()) {
@ -3597,28 +3715,26 @@ int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t
} else if (!pkey.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pkey is invalid", K(ret), K(pkey));
} else if (OB_FAIL(build_partition_statistic(*partition_array,
tenant_unit,
excluded_zones,
excluded_servers,
leader_info_map,
ignore_switch_percent))) {
} else if (OB_FAIL(build_partition_statistic(
*partition_array, tenant_unit, excluded_zones, excluded_servers, leader_info_map))) {
LOG_WARN("fail to build partition statistic", K(ret));
} else if (OB_FAIL(update_leader_candidates(*partition_array, leader_info_map))) {
LOG_WARN("fail to update leader candidate", K(ret));
} else if (OB_FAIL(get_cur_partition_leader(*partition_array, cur_leader, same_leader))) {
} else if (OB_FAIL(get_cur_partition_leader(
*partition_array, cur_leader, same_leader, stop_server_info_set, has_leader_in_stop_server))) {
LOG_WARN("fail to get cur partition leader", K(ret));
} else if (!same_leader) {
ignore_switch_percent = true;
} else if (!same_leader || has_leader_in_stop_server) {
LOG_INFO("leaders not in same server, force switch",
K(tenant_id),
"first_pk",
pkey,
"part_id",
pkey.get_partition_id());
} else if (!force && !enable_auto_leader_switch) {
need_switch = false;
}
if (OB_FAIL(ret)) {
if (OB_FAIL(ret) || !need_switch) {
} else if (OB_FAIL(choose_leader(tenant_id,
partition_array->get_tablegroup_id(),
leader_info_map,
@ -3629,7 +3745,7 @@ int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t
ret = OB_ERR_UNEXPECTED;
LOG_WARN("candidate leaders count shall not be 0", K(ret));
} else if (same_leader && has_exist_in_array(candidate_leaders, cur_leader)) {
need_switch = false; // cur leader already in candidate leaders, no need to switch
need_switch = false;
}
if (OB_FAIL(ret) || !need_switch) {
} else if (candidate_leaders.count() <= 0) {
@ -3638,9 +3754,8 @@ int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t
} else {
const int64_t count = candidate_leaders.count();
const int64_t idx = std::abs(static_cast<int64_t>(get_random()) % count);
ObAddr& advised_leader = candidate_leaders.at(idx);
PartitionArrayCursor cursor(
0, partition_array->count(), pg_info.index_, cur_leader, advised_leader, ignore_switch_percent);
ObAddr &advised_leader = candidate_leaders.at(idx);
PartitionArrayCursor cursor(0, partition_array->count(), pg_info.index_, cur_leader, advised_leader);
if (OB_FAIL(cursor_container.cursor_array_.push_back(cursor))) {
LOG_WARN("fail to push back", K(ret));
}
@ -3661,9 +3776,10 @@ int ObLeaderCoordinator::do_build_leader_switch_cursor_container(const uint64_t
return ret;
}
int ObLeaderCoordinator::build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit& tenant_unit,
common::ObArray<PartitionArray*>& partition_arrays, CursorContainer& cursor_container,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers, const bool force)
int ObLeaderCoordinator::build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit &tenant_unit,
common::ObArray<PartitionArray *> &partition_arrays, CursorContainer &cursor_container,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
const StopServerInfoSet &stop_server_info_set, const bool force)
{
int ret = OB_SUCCESS;
const bool enable_auto_leader_switch = config_->enable_auto_leader_switch;
@ -3674,12 +3790,24 @@ int ObLeaderCoordinator::build_leader_switch_cursor_container(const uint64_t ten
} else if (OB_INVALID_ID == tenant_id || partition_arrays.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("partition_arrays is empty", K(ret), K(tenant_id), K(partition_arrays));
} else if (!force && !enable_auto_leader_switch) {
} else if (!force && !enable_auto_leader_switch && 0 == stop_server_info_set.size()) {
// no need to switch leader, when auto leader switch is off
LOG_INFO("no need to switch leader",
K(tenant_id),
K(force),
K(enable_auto_leader_switch),
"stop server count",
stop_server_info_set.size());
} else if (OB_FAIL(pre_switch_index_array.reserve(partition_arrays.count()))) {
LOG_WARN("fail to reserve index array", K(ret));
} else if (OB_FAIL(build_pre_switch_pg_index_array(
tenant_id, tenant_unit, partition_arrays, pre_switch_index_array, excluded_zones, excluded_servers))) {
} else if (OB_FAIL(build_pre_switch_pg_index_array(tenant_id,
tenant_unit,
partition_arrays,
pre_switch_index_array,
excluded_zones,
excluded_servers,
stop_server_info_set,
force))) {
LOG_WARN("fail to pre switch pg index array", K(ret));
} else if (pre_switch_index_array.count() <= 0) {
// bypass, no need to switch
@ -3691,7 +3819,9 @@ int ObLeaderCoordinator::build_leader_switch_cursor_container(const uint64_t ten
partition_arrays,
excluded_zones,
excluded_servers,
cursor_container))) {
cursor_container,
stop_server_info_set,
force))) {
LOG_WARN("fail to do build leader switch cursor container", K(ret));
}
return ret;
@ -3760,9 +3890,9 @@ int ObLeaderCoordinator::build_pg_array_leader_candidates(
return ret;
}
int ObLeaderCoordinator::coordinate_partition_arrays(ExpectedLeaderWaitOperator& leader_wait_operator,
ObArray<PartitionArray*>& partition_arrays, TenantUnit& tenant_unit, CursorContainer& cursor_container,
const bool force)
int ObLeaderCoordinator::coordinate_partition_arrays(ExpectedLeaderWaitOperator &leader_wait_operator,
ObArray<PartitionArray *> &partition_arrays, TenantUnit &tenant_unit, CursorContainer &cursor_container,
const bool has_stop_server, const bool force)
{
int ret = OB_SUCCESS;
// excluded_zones and excluded_servers can be empty
@ -3774,7 +3904,7 @@ int ObLeaderCoordinator::coordinate_partition_arrays(ExpectedLeaderWaitOperator&
LOG_WARN("partition_arrays is empty", K(partition_arrays), K(ret));
} else {
ConcurrentSwitchLeaderStrategy switch_strategy(*this, partition_arrays, cursor_container);
if (OB_FAIL(switch_strategy.execute(tenant_unit, leader_wait_operator, force))) {
if (OB_FAIL(switch_strategy.execute(tenant_unit, leader_wait_operator, has_stop_server, force))) {
LOG_WARN("fail to execute switch", K(ret));
} else if (OB_FAIL(leader_wait_operator.finally_wait())) {
LOG_WARN("leader wait operator try wait failed", K(ret));
@ -3821,8 +3951,8 @@ uint32_t ObLeaderCoordinator::get_random()
return static_cast<uint32_t>(ObRandom::rand(min, max));
}
int ObLeaderCoordinator::get_cur_partition_leader(
const ObIArray<Partition>& partitions, common::ObAddr& leader, bool& has_same_leader)
int ObLeaderCoordinator::get_cur_partition_leader(const ObIArray<Partition> &partitions, common::ObAddr &leader,
bool &has_same_leader, const StopServerInfoSet &stop_server_info_set, bool &has_leader_in_stop_server)
{
int ret = OB_SUCCESS;
ObAddr first_leader;
@ -3830,6 +3960,7 @@ int ObLeaderCoordinator::get_cur_partition_leader(
leader.reset();
has_same_leader = true;
has_leader_in_stop_server = false;
if (!inited_) {
ret = OB_NOT_INIT;
@ -3853,6 +3984,27 @@ int ObLeaderCoordinator::get_cur_partition_leader(
}
}
}
if (OB_SUCC(ret) && 0 != stop_server_info_set.size()) {
int tmp_ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCCESS == tmp_ret && !has_leader_in_stop_server && i < partitions.count(); ++i) {
if (OB_SUCCESS != (tmp_ret = partitions.at(i).get_leader(tmp_leader))) {
if (OB_LEADER_NOT_EXIST != tmp_ret) {
LOG_WARN("failed to get leader", KR(tmp_ret));
} else {
tmp_ret = OB_SUCCESS;
}
} else if (OB_SUCCESS != (tmp_ret = stop_server_info_set.exist_refactored(tmp_leader))) {
if (OB_HASH_EXIST == tmp_ret) {
has_leader_in_stop_server = true;
tmp_ret = OB_SUCCESS;
} else if (OB_HASH_NOT_EXIST == tmp_ret) {
tmp_ret = OB_SUCCESS;
} else {
LOG_WARN("fail to check leader in stop server info set", KR(tmp_ret), K(tmp_leader));
}
}
}
}
if (OB_SUCC(ret) && has_same_leader) {
if (!first_leader.is_valid()) {
@ -3869,9 +4021,9 @@ int ObLeaderCoordinator::get_cur_partition_leader(
return ret;
}
int ObLeaderCoordinator::update_candidate_leader_info(const Partition& partition,
const share::ObPartitionReplica& replica, const int64_t replica_index, const TenantUnit& tenant_unit,
CandidateLeaderInfo& info, int64_t& zone_migrate_out_or_transform_count, bool& is_ignore_switch_percent)
int ObLeaderCoordinator::update_candidate_leader_info(const Partition &partition,
const share::ObPartitionReplica &replica, const int64_t replica_index, const TenantUnit &tenant_unit,
CandidateLeaderInfo &info, int64_t &zone_migrate_out_or_transform_count)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
@ -3906,10 +4058,6 @@ int ObLeaderCoordinator::update_candidate_leader_info(const Partition& partition
} else {
++info.in_normal_unit_count_;
}
} else if (replica.is_leader_by_election()) {
// When the main unit is not in the normal unit,
// it needs to be forced to switch
is_ignore_switch_percent = true;
}
if (OB_SUCC(ret)) {
@ -3929,15 +4077,14 @@ int ObLeaderCoordinator::update_candidate_leader_info(const Partition& partition
return ret;
}
int ObLeaderCoordinator::build_partition_statistic(const PartitionArray& partitions, const TenantUnit& tenant_unit,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers,
CandidateLeaderInfoMap& leader_info_map, bool& is_ignore_switch_percent)
int ObLeaderCoordinator::build_partition_statistic(const PartitionArray &partitions, const TenantUnit &tenant_unit,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
CandidateLeaderInfoMap &leader_info_map)
{
int ret = OB_SUCCESS;
CandidateLeaderInfo info;
const int64_t bucket_num = 64;
const int overwrite_flag = 1;
is_ignore_switch_percent = false;
ObRandomZoneSelector random_selector;
ObSEArray<ObRawPrimaryZoneUtil::ZoneScore, MAX_ZONE_NUM, ObNullAllocator> zone_score_array;
// we deem region num is no more than zone num, so max zone num is enough
@ -3992,13 +4139,8 @@ int ObLeaderCoordinator::build_partition_statistic(const PartitionArray& partiti
LOG_WARN("fail to locate candidate leader info", K(ret));
} else if (OB_FAIL(zone_migrate_count_map.locate(replica_zone, zone_migrate_out_or_transform_cnt))) {
LOG_WARN("fail to locate zone migrate or transform cnt", K(ret));
} else if (OB_FAIL(update_candidate_leader_info(partitions.at(i),
replica,
j,
tenant_unit,
info,
zone_migrate_out_or_transform_cnt,
is_ignore_switch_percent))) {
} else if (OB_FAIL(update_candidate_leader_info(
partitions.at(i), replica, j, tenant_unit, info, zone_migrate_out_or_transform_cnt))) {
LOG_WARN("fail to update candidate leader info", K(ret));
} else if (OB_FAIL(leader_info_map.set_refactored(replica.server_, info, overwrite_flag))) {
LOG_WARN("failed to set leader info", K(ret));
@ -6699,13 +6841,13 @@ int ObLeaderCoordinator::get_excluded_zone_array(common::ObIArray<common::ObZone
return ret;
}
int ObLeaderCoordinator::SwitchLeaderStrategy::check_before_coordinate_partition(const Partition& partition,
const TenantUnit& tenant_unit, const bool force, const ObPartitionKey& part_key, ObAddr& advised_leader,
ObAddr& cur_leader, bool& need_switch)
int ObLeaderCoordinator::SwitchLeaderStrategy::check_before_coordinate_partition(const Partition &partition,
const TenantUnit &tenant_unit, const bool force, const ObPartitionKey &part_key, ObAddr &advised_leader,
ObAddr &cur_leader, bool &need_switch, const bool has_stop_server)
{
int ret = OB_SUCCESS;
cur_leader.reset();
need_switch = (force || host_.config_->enable_auto_leader_switch);
need_switch = (force || host_.config_->enable_auto_leader_switch || has_stop_server);
// check whether current leader exist
if (need_switch) {
if (OB_UNLIKELY(nullptr == partition.info_)) {
@ -6821,10 +6963,10 @@ int ObLeaderCoordinator::SwitchLeaderStrategy::check_before_coordinate_partition
return ret;
}
int ObLeaderCoordinator::SwitchLeaderStrategy::coordinate_partitions_per_tg(TenantUnit& tenant_unit,
PartitionArray& partition_array, PartitionArrayCursor& cursor, const bool force,
ExpectedLeaderWaitOperator& leader_wait_operator, SwitchLeaderListAsyncOperator& async_rpc_operator,
bool& do_switch_leader)
int ObLeaderCoordinator::SwitchLeaderStrategy::coordinate_partitions_per_tg(TenantUnit &tenant_unit,
PartitionArray &partition_array, PartitionArrayCursor &cursor, const bool force,
ExpectedLeaderWaitOperator &leader_wait_operator, SwitchLeaderListAsyncOperator &async_rpc_operator,
bool &do_switch_leader, const bool has_stop_server)
{
int ret = OB_SUCCESS;
ObPartitionKey first_part_key;
@ -6862,8 +7004,14 @@ int ObLeaderCoordinator::SwitchLeaderStrategy::coordinate_partitions_per_tg(Tena
LOG_WARN("rs is stopped", K(ret));
} else if (OB_FAIL(partition_array.at(i).get_partition_key(part_key))) {
LOG_WARN("fail to get partition key", K(ret));
} else if (OB_FAIL(check_before_coordinate_partition(
partition_array.at(i), tenant_unit, force, part_key, new_leader, cur_leader, need_switch))) {
} else if (OB_FAIL(check_before_coordinate_partition(partition_array.at(i),
tenant_unit,
force,
part_key,
new_leader,
cur_leader,
need_switch,
has_stop_server))) {
LOG_WARN("failed to check before coordinate partition", K(ret), K(i));
} else if (!need_switch) {
const bool is_new_leader = false;
@ -7182,8 +7330,8 @@ int ObLeaderCoordinator::ConcurrentSwitchLeaderStrategy::try_reconnect_cursor_qu
return ret;
}
int ObLeaderCoordinator::ConcurrentSwitchLeaderStrategy::execute(
TenantUnit& tenant_unit, ExpectedLeaderWaitOperator& leader_wait_operator, const bool force)
int ObLeaderCoordinator::ConcurrentSwitchLeaderStrategy::execute(TenantUnit &tenant_unit,
ExpectedLeaderWaitOperator &leader_wait_operator, const bool has_stop_server, const bool force)
{
int ret = OB_SUCCESS;
if (OB_FAIL(init_cursor_queue_map())) {
@ -7248,7 +7396,7 @@ int ObLeaderCoordinator::ConcurrentSwitchLeaderStrategy::execute(
LOG_WARN("partition array ptr is null", K(ret));
} else if (cursor_link->cursor_ptr_->switch_start()) {
need_switch = true;
} else if (cursor_link->cursor_ptr_->ignore_switch_percent_ || force) {
} else if (force) {
need_switch = true;
} else {
ObSEArray<common::ObAddr, 1> candidate_leaders;
@ -7280,7 +7428,8 @@ int ObLeaderCoordinator::ConcurrentSwitchLeaderStrategy::execute(
force,
leader_wait_operator,
async_rpc_operator,
do_switch_leader))) {
do_switch_leader,
has_stop_server))) {
LOG_WARN("fail to coordinate partition per tg", K(ret));
} else if (!do_switch_leader) {
// can't cut, this turn has reached the upper limit

View File

@ -268,6 +268,8 @@ class ObLeaderCoordinator : public ObILeaderCoordinator, public ObRsReentrantThr
public:
static const int64_t WAIT_SWITCH_LEADER_TIMEOUT = 16 * 1000 * 1000; // 16s
static const int64_t ALL_PARTITIONS = -1;
static const int64_t BUCKET_NUM = 1000;
typedef common::hash::ObHashSet<common::ObAddr> StopServerInfoSet;
ObLeaderCoordinator();
virtual ~ObLeaderCoordinator();
@ -290,9 +292,9 @@ public:
virtual int check_daily_merge_switch_leader(
const common::ObIArray<common::ObZone>& zone_list, common::ObIArray<bool>& results) override;
virtual int coordinate() override;
virtual int coordinate_tenants(const common::ObArray<common::ObZone>& excluded_zones,
const common::ObIArray<common::ObAddr>& excluded_servers, const common::ObIArray<uint64_t>& tenant_ids,
const bool force = false);
virtual int coordinate_tenants(const common::ObArray<common::ObZone> &excluded_zones,
const common::ObIArray<common::ObAddr> &excluded_servers, const common::ObIArray<uint64_t> &tenant_ids,
const StopServerInfoSet &stop_server_info_set, const bool force = false);
void set_merge_status(bool is_in_merging) override
{
is_in_merging_ = is_in_merging;
@ -1192,20 +1194,17 @@ private:
};
struct PartitionArrayCursor {
PartitionArrayCursor()
: part_idx_(0), part_cnt_(0), array_idx_(0), cur_leader_(), advised_leader_(), ignore_switch_percent_(false)
PartitionArrayCursor() : part_idx_(0), part_cnt_(0), array_idx_(0), cur_leader_(), advised_leader_()
{}
PartitionArrayCursor(int64_t part_idx, int64_t part_cnt, int64_t array_idx, const common::ObAddr& cur_leader,
const common::ObAddr& advised_leader, bool ignore_switch_percent)
PartitionArrayCursor(int64_t part_idx, int64_t part_cnt, int64_t array_idx, const common::ObAddr &cur_leader,
const common::ObAddr &advised_leader)
: part_idx_(part_idx),
part_cnt_(part_cnt),
array_idx_(array_idx),
cur_leader_(cur_leader),
advised_leader_(advised_leader),
ignore_switch_percent_(ignore_switch_percent)
advised_leader_(advised_leader)
{}
TO_STRING_KV(
K(part_idx_), K(part_cnt_), K(array_idx_), K(cur_leader_), K(advised_leader_), K(ignore_switch_percent_));
TO_STRING_KV(K(part_idx_), K(part_cnt_), K(array_idx_), K(cur_leader_), K(advised_leader_));
bool switch_finish()
{
return part_idx_ >= part_cnt_;
@ -1224,7 +1223,6 @@ private:
int64_t array_idx_;
common::ObAddr cur_leader_;
common::ObAddr advised_leader_;
bool ignore_switch_percent_;
};
struct CursorContainer {
public:
@ -1324,17 +1322,17 @@ private:
{}
public:
virtual int execute(
TenantUnit& tenant_unit, ExpectedLeaderWaitOperator& leader_wait_operator, const bool force) = 0;
virtual int execute(TenantUnit &tenant_unit, ExpectedLeaderWaitOperator &leader_wait_operator, const bool force,
const bool has_stop_server) = 0;
protected:
int coordinate_partitions_per_tg(TenantUnit& tenant_unit, PartitionArray& partition_array,
PartitionArrayCursor& cursor, const bool force, ExpectedLeaderWaitOperator& leader_wait_operator,
SwitchLeaderListAsyncOperator& async_rpc_operator, bool& do_switch_leader);
int check_before_coordinate_partition(const Partition& partition, const TenantUnit& tenant_unit, const bool force,
const common::ObPartitionKey& part_key, common::ObAddr& advised_leader, common::ObAddr& cur_leader,
bool& need_switch);
int check_tenant_on_server(const TenantUnit& tenant_unit, const common::ObAddr& server, bool& tenant_on_server);
int coordinate_partitions_per_tg(TenantUnit &tenant_unit, PartitionArray &partition_array,
PartitionArrayCursor &cursor, const bool force, ExpectedLeaderWaitOperator &leader_wait_operator,
SwitchLeaderListAsyncOperator &async_rpc_operator, bool &do_switch_leader, const bool has_stop_server);
int check_before_coordinate_partition(const Partition &partition, const TenantUnit &tenant_unit, const bool force,
const common::ObPartitionKey &part_key, common::ObAddr &advised_leader, common::ObAddr &cur_leader,
bool &need_switch, const bool has_stop_server);
int check_tenant_on_server(const TenantUnit &tenant_unit, const common::ObAddr &server, bool &tenant_on_server);
protected:
ObLeaderCoordinator& host_;
@ -1382,7 +1380,8 @@ private:
{}
public:
virtual int execute(TenantUnit& tenant_unit, ExpectedLeaderWaitOperator& leader_wait_operator, const bool force);
virtual int execute(TenantUnit &tenant_unit, ExpectedLeaderWaitOperator &leader_wait_operator, const bool force,
const bool has_stop_server);
private:
int init_cursor_queue_map();
@ -1400,10 +1399,11 @@ private:
};
int auto_coordinate();
int get_auto_leader_switch_idle_interval(int64_t& idle_interval_us) const;
int coordinate_helper(PartitionInfoContainer& partition_info_container,
const common::ObArray<common::ObZone>& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers,
const common::ObIArray<uint64_t>& tenant_ids, const bool force = false);
int get_auto_leader_switch_idle_interval(int64_t &idle_interval_us) const;
int coordinate_helper(PartitionInfoContainer &partition_info_container,
const common::ObArray<common::ObZone> &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
const common::ObIArray<uint64_t> &tenant_ids, const StopServerInfoSet &stop_server_info_set,
const bool force = false);
int move_sys_tenant_to_last(const common::ObIArray<uint64_t>& tenant_ids, common::ObIArray<uint64_t>& new_tenant_ids);
@ -1427,44 +1427,47 @@ private:
PartitionInfoContainer& partition_info_container, const common::ObIArray<uint64_t>& table_ids,
const bool small_tenant, common::ObIAllocator& partition_allocator, TablegroupPartition& tg_partition,
const int64_t partition_idx);
int fill_partition_tg_id(share::schema::ObSchemaGetterGuard& schema_guard, const uint64_t table_id,
const bool small_tenant, Partition& partition);
int append_to_tg_partition(share::schema::ObSchemaGetterGuard& schema_guard, const bool is_single_partition_group,
const int64_t partition_idx, TablegroupPartition& tg_partition, const Partition& partition,
common::ObIAllocator& partition_allocator, const int64_t partition_array_capacity);
int move_all_core_last(TenantPartition& tenant_partition);
int get_tenant_unit(const uint64_t tenant_id, TenantUnit& tenant_unit);
int build_leader_balance_info(share::schema::ObSchemaGetterGuard& schema_guard, const uint64_t tenant_id,
LcBalanceGroupContainer& balance_group_container, TenantPartition& tenant_partition, TenantUnit& tenant_unit);
int coordinate_partition_arrays(ExpectedLeaderWaitOperator& leader_wait_operator,
common::ObArray<PartitionArray*>& partition_arrays, TenantUnit& tenant_unit, CursorContainer& cursor_container,
const bool force = false);
int build_single_pg_leader_candidates(GetLeaderCandidatesAsyncV2Operator& async_rpc_operator,
const int64_t in_array_index, PartitionArray& partitions, common::ObIArray<PartitionArray*>& partition_arrays);
int get_partition_prep_candidates(const common::ObAddr& leader,
const common::ObIArray<share::ObRawPrimaryZoneUtil::ZoneScore>& zone_score_array,
const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore>& region_score_array, Partition& partition,
ObIArray<common::ObAddr>& prep_partitions);
int get_high_score_regions(const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore>& region_score_array,
common::ObIArray<common::ObRegion>& high_score_regions);
int fill_partition_tg_id(share::schema::ObSchemaGetterGuard &schema_guard, const uint64_t table_id,
const bool small_tenant, Partition &partition);
int append_to_tg_partition(share::schema::ObSchemaGetterGuard &schema_guard, const bool is_single_partition_group,
const int64_t partition_idx, TablegroupPartition &tg_partition, const Partition &partition,
common::ObIAllocator &partition_allocator, const int64_t partition_array_capacity);
int move_all_core_last(TenantPartition &tenant_partition);
int get_tenant_unit(const uint64_t tenant_id, TenantUnit &tenant_unit);
int build_leader_balance_info(share::schema::ObSchemaGetterGuard &schema_guard, const uint64_t tenant_id,
LcBalanceGroupContainer &balance_group_container, TenantPartition &tenant_partition, TenantUnit &tenant_unit);
int coordinate_partition_arrays(ExpectedLeaderWaitOperator &leader_wait_operator,
common::ObArray<PartitionArray *> &partition_arrays, TenantUnit &tenant_unit, CursorContainer &cursor_container,
const bool has_stop_server, const bool force = false);
int build_single_pg_leader_candidates(GetLeaderCandidatesAsyncV2Operator &async_rpc_operator,
const int64_t in_array_index, PartitionArray &partitions, common::ObIArray<PartitionArray *> &partition_arrays);
int get_partition_prep_candidates(const common::ObAddr &leader,
const common::ObIArray<share::ObRawPrimaryZoneUtil::ZoneScore> &zone_score_array,
const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore> &region_score_array, Partition &partition,
ObIArray<common::ObAddr> &prep_partitions);
int get_high_score_regions(const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore> &region_score_array,
common::ObIArray<common::ObRegion> &high_score_regions);
static bool prep_candidates_match(
ObArray<common::ObAddr>& prev_prep_candidates, ObSArray<common::ObAddr>& this_prep_candidates);
int do_coordinate_partitions(const PartitionArray& partitions, const TenantUnit& tenant_unit,
const common::ObAddr& advised_leader, const bool force, ExpectedLeaderWaitOperator& leader_wait_operator);
// check advise leader can be switch, may update advise leader if needed.
int build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit& tenant_unit,
common::ObArray<PartitionArray*>& partition_arrays, CursorContainer& cursor_array,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers, const bool force);
int build_pre_switch_pg_index_array(const uint64_t tenant_id, TenantUnit& tenant_unit,
common::ObArray<PartitionArray*>& partition_arrays, common::ObIArray<PreSwitchPgInfo>& pre_switch_index_array,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers);
int build_pg_array_leader_candidates(const common::ObIArray<PreSwitchPgInfo>& pre_switch_index_array,
common::ObArray<PartitionArray*>& partition_arrays);
int do_build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit& tenant_unit,
const common::ObIArray<PreSwitchPgInfo>& pre_switch_index_array,
common::ObArray<PartitionArray*>& partition_arrays, const ObZoneList& excluded_zones,
const common::ObIArray<common::ObAddr>& excluded_servers, CursorContainer& cursor_container);
int build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit &tenant_unit,
common::ObArray<PartitionArray *> &partition_arrays, CursorContainer &cursor_array,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
const StopServerInfoSet &stop_server_info_set, const bool force);
int build_pre_switch_pg_index_array(const uint64_t tenant_id, TenantUnit &tenant_unit,
common::ObArray<PartitionArray *> &partition_arrays, common::ObIArray<PreSwitchPgInfo> &pre_switch_index_array,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
const StopServerInfoSet &stop_server_info_set, const bool force);
int build_pg_array_leader_candidates(const common::ObIArray<PreSwitchPgInfo> &pre_switch_index_array,
common::ObArray<PartitionArray *> &partition_arrays);
int do_build_leader_switch_cursor_container(const uint64_t tenant_id, TenantUnit &tenant_unit,
const common::ObIArray<PreSwitchPgInfo> &pre_switch_index_array,
common::ObArray<PartitionArray *> &partition_arrays, const ObZoneList &excluded_zones,
const common::ObIArray<common::ObAddr> &excluded_servers, CursorContainer &cursor_container,
const StopServerInfoSet &stop_server_info_set, const bool force);
// replica's server same with unit server and unit not migrating
static int check_in_normal_unit(
const share::ObPartitionReplica& replica, const TenantUnit& tenant_unit, bool& in_normal_unit);
@ -1473,15 +1476,15 @@ private:
// remove replicas not alive
int remove_lost_replicas(share::ObPartitionInfo& partition_info);
// compare by tuple (tablegroup_id, table_id)
static bool partition_entity_cmp(const IPartitionEntity* l, const IPartitionEntity* r);
int get_cur_partition_leader(
const common::ObIArray<Partition>& partitions, common::ObAddr& leader, bool& has_same_leader);
int build_partition_statistic(const PartitionArray& partitions, const TenantUnit& tenant_unit,
const ObZoneList& excluded_zones, const common::ObIArray<common::ObAddr>& excluded_servers,
CandidateLeaderInfoMap& leader_info_map, bool& is_ignore_switch_percent);
int update_candidate_leader_info(const Partition& partition, const share::ObPartitionReplica& replica,
const int64_t replica_index, const TenantUnit& tenant_unit, CandidateLeaderInfo& info,
int64_t& zone_migrate_out_or_transform_count, bool& is_ignore_switch_percent);
static bool partition_entity_cmp(const IPartitionEntity *l, const IPartitionEntity *r);
int get_cur_partition_leader(const common::ObIArray<Partition> &partitions, common::ObAddr &leader,
bool &has_same_leader, const StopServerInfoSet &stop_server_info_set, bool &has_leader_in_stop_server);
int build_partition_statistic(const PartitionArray &partitions, const TenantUnit &tenant_unit,
const ObZoneList &excluded_zones, const common::ObIArray<common::ObAddr> &excluded_servers,
CandidateLeaderInfoMap &leader_info_map);
int update_candidate_leader_info(const Partition &partition, const share::ObPartitionReplica &replica,
const int64_t replica_index, const TenantUnit &tenant_unit, CandidateLeaderInfo &info,
int64_t &zone_migrate_out_or_transform_count);
int choose_leader(const uint64_t tenant_id, const uint64_t tablegroup_id,
const CandidateLeaderInfoMap& leader_info_map, common::ObIArray<common::ObAddr>& candidate_leaders,
const common::ObAddr& cur_leader);
@ -1539,11 +1542,14 @@ private:
const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore>& region_score_array,
const PartitionArray& partition_array, CandidateZoneInfoMap& candidate_zone_map);
int update_candidate_zone_info_map(
const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore>& region_score_array,
const common::ObAddr& candidate, CandidateZoneInfoMap& candidate_zone_map);
int do_check_daily_merge_switch_leader_by_pg(const PartitionArray& partition_array,
const CandidateZoneInfoMap& candidate_zone_map, const common::ObIArray<common::ObZone>& zone_list,
common::ObIArray<bool>& results);
const common::ObIArray<share::ObRawPrimaryZoneUtil::RegionScore> &region_score_array,
const common::ObAddr &candidate, CandidateZoneInfoMap &candidate_zone_map);
int do_check_daily_merge_switch_leader_by_pg(const PartitionArray &partition_array,
const CandidateZoneInfoMap &candidate_zone_map, const common::ObIArray<common::ObZone> &zone_list,
common::ObIArray<bool> &results);
int build_stop_server_info_set(StopServerInfoSet &stop_server_info_set, bool &skip_switch_leader);
int init_stop_server_info_set(StopServerInfoSet &stop_server_info_set);
int check_has_server_stopped(bool &has_stopped_server);
private:
bool inited_;

View File

@ -6821,10 +6821,6 @@ int ObRootService::stop_server(const obrpc::ObAdminServerArg& arg)
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(arg), K(ret));
} else if (!config_->enable_auto_leader_switch) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("cannot stop server when auto leader switchover disabled", K(ret));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "Stop server when auto leader switchover is disabled");
} else if (OB_FAIL(get_readwrite_servers(arg.servers_, readwrite_servers))) {
LOG_WARN("fail to get readwrite servers", K(ret));
} else if (readwrite_servers.count() <= 0) {
@ -7103,10 +7099,6 @@ int ObRootService::stop_zone(const obrpc::ObAdminZoneArg& arg)
} else if (!arg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(arg), K(ret));
} else if (!config_->enable_auto_leader_switch) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("cannot stop zone when auto leader switchover is disabled", K(ret));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "Stop zone when auto leader switchover is disabled");
} else if (OB_FAIL(zone_manager_.check_zone_exist(arg.zone_, zone_exist))) {
LOG_WARN("fail to check zone exist", K(ret));
} else if (!zone_exist) {

View File

@ -676,6 +676,9 @@ int ObAdminSwitchReplicaRole::switch_zone_to_leader(const ObZone& zone, const ui
ObArray<ObZone> excluded_zones;
ObArray<ObAddr> excluded_servers;
ObArray<uint64_t> tenant_ids;
ObLeaderCoordinator::StopServerInfoSet stop_server_info_set;
const int64_t BUCKET_NUM = 1000;
const int64_t bucket_num = hash::cal_next_prime(BUCKET_NUM);
HEAP_VAR(ObGlobalInfo, global_info)
{
ObArray<ObZoneInfo> infos;
@ -702,8 +705,10 @@ int ObAdminSwitchReplicaRole::switch_zone_to_leader(const ObZone& zone, const ui
ObAddr invalid_server;
if (OB_FAIL(get_switch_replica_tenants(zone, invalid_server, tenant_id, tenant_ids))) {
LOG_WARN("get tenants of zone failed", K(zone), K(invalid_server), K(tenant_id), K(ret));
} else if (OB_FAIL(stop_server_info_set.create(bucket_num))) {
LOG_WARN("fail to create stop server info set", KR(ret), K(bucket_num));
} else if (OB_FAIL(ctx_.leader_coordinator_->coordinate_tenants(
excluded_zones, excluded_servers, tenant_ids, force))) {
excluded_zones, excluded_servers, tenant_ids, stop_server_info_set, force))) {
LOG_WARN("switch zone replica role to follower failed",
K(excluded_zones),
K(excluded_servers),
@ -722,6 +727,9 @@ int ObAdminSwitchReplicaRole::switch_replica_by_server(
{
int ret = OB_SUCCESS;
bool is_alive = false;
ObLeaderCoordinator::StopServerInfoSet stop_server_info_set;
const int64_t BUCKET_NUM = 1000;
const int64_t bucket_num = hash::cal_next_prime(BUCKET_NUM);
if (!ctx_.is_inited()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -747,8 +755,10 @@ int ObAdminSwitchReplicaRole::switch_replica_by_server(
LOG_WARN("get tenants of server failed", K(invalid_zone), K(server), K(tenant_id), K(ret));
} else if (OB_FAIL(excluded_servers.push_back(server))) {
LOG_WARN("push back server failed", K(ret));
} else if (OB_FAIL(
ctx_.leader_coordinator_->coordinate_tenants(excluded_zones, excluded_servers, tenant_ids, force))) {
} else if (OB_FAIL(stop_server_info_set.create(bucket_num))) {
LOG_WARN("fail to create stop server info set", KR(ret), K(bucket_num));
} else if (OB_FAIL(ctx_.leader_coordinator_->coordinate_tenants(
excluded_zones, excluded_servers, tenant_ids, stop_server_info_set, force))) {
LOG_WARN("switch server replica role to follower failed", K(server), K(tenant_ids), K(force), K(ret));
}
} else {
@ -762,6 +772,9 @@ int ObAdminSwitchReplicaRole::switch_replica_by_zone(const ObZone& zone, const u
{
int ret = OB_SUCCESS;
bool zone_exist = false;
ObLeaderCoordinator::StopServerInfoSet stop_server_info_set;
const int64_t BUCKET_NUM = 1000;
const int64_t bucket_num = hash::cal_next_prime(BUCKET_NUM);
if (!ctx_.is_inited()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -788,8 +801,10 @@ int ObAdminSwitchReplicaRole::switch_replica_by_zone(const ObZone& zone, const u
LOG_WARN("get coordinate_tenants failed", K(zone), K(invalid_server), K(tenant_id), K(ret));
} else if (OB_FAIL(excluded_zones.push_back(zone))) {
LOG_WARN("push back zone failed", K(ret));
} else if (OB_FAIL(
ctx_.leader_coordinator_->coordinate_tenants(excluded_zones, excluded_servers, tenant_ids, force))) {
} else if (OB_FAIL(stop_server_info_set.create(bucket_num))) {
LOG_WARN("fail to create stop server info set", KR(ret), K(bucket_num));
} else if (OB_FAIL(ctx_.leader_coordinator_->coordinate_tenants(
excluded_zones, excluded_servers, tenant_ids, stop_server_info_set, force))) {
LOG_WARN("switch zone replica role to follower failed", K(zone), K(tenant_ids), K(force), K(ret));
}
} else {

View File

@ -115,21 +115,19 @@ int ObAllVirtualLeaderStat::build_partition_group_candidate_leader_array()
K(partition_group_idx_),
"tenant partition group cnt",
tenant_partition_.count());
} else if (OB_ISNULL(leader_coordinator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("leader_coordinator_ is null", KR(ret));
} else {
ObLeaderCoordinator::PartitionArray& partitions = *tenant_partition_.at(partition_group_idx_);
ObLeaderCoordinator::CandidateLeaderInfoMap leader_info_map(*leader_coordinator_);
const int64_t MAP_BUCKET_NUM = 64;
bool is_ignore_switch_percent = false;
const common::ObArray<common::ObZone> exclude_zones;
const common::ObArray<common::ObAddr> exclude_servers;
if (OB_FAIL(leader_info_map.create(MAP_BUCKET_NUM, ObModIds::OB_RS_LEADER_COORDINATOR))) {
LOG_WARN("fail to create leader info map", K(ret));
} else if (OB_FAIL(leader_coordinator_->build_partition_statistic(partitions,
tenant_unit_,
exclude_zones,
exclude_servers,
leader_info_map,
is_ignore_switch_percent))) {
} else if (OB_FAIL(leader_coordinator_->build_partition_statistic(
partitions, tenant_unit_, exclude_zones, exclude_servers, leader_info_map))) {
LOG_WARN("fail to build partition statistic", K(ret));
} else if (OB_FAIL(leader_coordinator_->update_leader_candidates(partitions, leader_info_map))) {
LOG_WARN("fail to update leader candidates", K(ret));