From 185d056d52e2dd81be9c5d568e47e1f7a43e9786 Mon Sep 17 00:00:00 2001 From: linqiucen Date: Wed, 30 Aug 2023 12:14:02 +0000 Subject: [PATCH] new rs job related to tenant LOCALITY, PRIMARY ZONE, UNIT_NUM --- .../test_standby_balance_ls_group.cpp | 5 +- src/rootserver/CMakeLists.txt | 3 +- .../ob_alter_locality_finish_checker.cpp | 64 +++- .../ob_alter_locality_finish_checker.h | 12 +- .../ob_alter_primary_zone_checker.cpp | 221 +++++++++++ .../ob_alter_primary_zone_checker.h | 45 +++ src/rootserver/ob_balance_ls_primary_zone.cpp | 150 ++++++-- src/rootserver/ob_balance_ls_primary_zone.h | 13 +- src/rootserver/ob_ddl_service.cpp | 196 +++++----- src/rootserver/ob_ddl_service.h | 8 +- src/rootserver/ob_ls_service_helper.cpp | 11 +- src/rootserver/ob_ls_service_helper.h | 5 +- src/rootserver/ob_recovery_ls_service.cpp | 4 +- src/rootserver/ob_root_utils.cpp | 67 ++++ src/rootserver/ob_root_utils.h | 7 + .../ob_rootservice_util_checker.cpp | 24 +- src/rootserver/ob_rootservice_util_checker.h | 6 +- src/rootserver/ob_rs_job_table_operator.cpp | 120 +++--- src/rootserver/ob_rs_job_table_operator.h | 40 +- src/rootserver/ob_server_manager.cpp | 31 +- src/rootserver/ob_server_zone_op_service.cpp | 35 +- ...b_shrink_expand_resource_pool_checker.cpp} | 142 ++++++-- ... ob_shrink_expand_resource_pool_checker.h} | 18 +- src/rootserver/ob_tenant_balance_service.cpp | 197 ++++++++-- src/rootserver/ob_tenant_balance_service.h | 10 +- src/rootserver/ob_unit_manager.cpp | 342 ++++++++++++++---- src/rootserver/ob_unit_manager.h | 31 +- src/share/ob_debug_sync_point.h | 3 + src/share/ob_tenant_info_proxy.cpp | 82 +++-- src/share/ob_tenant_info_proxy.h | 14 +- .../rootserver/test_rs_job_table_operator.cpp | 4 +- 31 files changed, 1429 insertions(+), 481 deletions(-) create mode 100644 src/rootserver/ob_alter_primary_zone_checker.cpp create mode 100644 src/rootserver/ob_alter_primary_zone_checker.h rename src/rootserver/{ob_shrink_resource_pool_checker.cpp => ob_shrink_expand_resource_pool_checker.cpp} (60%) rename src/rootserver/{ob_shrink_resource_pool_checker.h => ob_shrink_expand_resource_pool_checker.h} (80%) diff --git a/mittest/simple_server/test_standby_balance_ls_group.cpp b/mittest/simple_server/test_standby_balance_ls_group.cpp index c3a62a8b8..340235bbd 100644 --- a/mittest/simple_server/test_standby_balance_ls_group.cpp +++ b/mittest/simple_server/test_standby_balance_ls_group.cpp @@ -97,7 +97,8 @@ TEST_F(TestStandbyBalance, BalanceLSGroup) ASSERT_EQ(OB_SUCCESS, sql_proxy.write(OB_SYS_TENANT_ID, sql.ptr(), affected_row)); ASSERT_EQ(1, affected_row); ObTenantLSInfo tenant_stat(get_curr_observer().get_gctx().sql_proxy_, &tenant_schema1, tenant_id_); - ASSERT_EQ(OB_SUCCESS, ObLSServiceHelper::balance_ls_group(tenant_stat)); + bool is_balanced = false; + ASSERT_EQ(OB_SUCCESS, ObLSServiceHelper::balance_ls_group(true /*need_execute_balance*/, tenant_stat, is_balanced)); ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select ls_id, ls_group_id, unit_group_id from __all_ls_status where ls_id != 1 order by ls_id")); int64_t ls_id = 0; uint64_t ls_group_id = 0; @@ -144,7 +145,7 @@ TEST_F(TestStandbyBalance, BalanceLSGroup) ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("insert into __all_unit(unit_id, unit_group_id, resource_pool_id, zone, svr_ip, svr_port, migrate_from_svr_ip, migrate_from_svr_port, status) values(%lu, %lu, 1, 'z1', '127.0.0.1', 2882, ' ', 0, 'ACTIVE')", u2, u2)); ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_row)); ASSERT_EQ(1, affected_row); - ASSERT_EQ(OB_SUCCESS, ObLSServiceHelper::balance_ls_group(tenant_stat)); + ASSERT_EQ(OB_SUCCESS, ObLSServiceHelper::balance_ls_group(true /*need_execute_balance*/, tenant_stat, is_balanced)); ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select ls_id, ls_group_id, unit_group_id from __all_ls_status where ls_id != 1 order by ls_id")); SMART_VAR(ObMySQLProxy::MySQLResult, res1) { ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res1, sql.ptr())); diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index 4af70da5c..29b1b8db5 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -33,6 +33,7 @@ ob_set_subtarget(ob_rootserver common ob_disaster_recovery_task_executor.cpp ob_migrate_unit_finish_checker.cpp ob_rootservice_util_checker.cpp + ob_alter_primary_zone_checker.cpp ob_all_server_checker.cpp ob_all_server_task.cpp ob_balance_info.cpp @@ -94,7 +95,7 @@ ob_set_subtarget(ob_rootserver common ob_lob_piece_builder.cpp ob_lob_meta_builder.cpp ob_ls_recovery_stat_handler.cpp - ob_shrink_resource_pool_checker.cpp + ob_shrink_expand_resource_pool_checker.cpp ) ob_set_subtarget(ob_rootserver balance diff --git a/src/rootserver/ob_alter_locality_finish_checker.cpp b/src/rootserver/ob_alter_locality_finish_checker.cpp index 316effeb1..9ec94b317 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.cpp +++ b/src/rootserver/ob_alter_locality_finish_checker.cpp @@ -26,7 +26,7 @@ using namespace common; using namespace share; namespace rootserver { -OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_); +OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_, rs_job_id_, rs_job_check_ret_); ObAlterLocalityFinishChecker::ObAlterLocalityFinishChecker(volatile bool &stop) : inited_(false), @@ -100,21 +100,42 @@ int ObAlterLocalityFinishChecker::check() //STEP 2: check each tenant whether finish alter locality int tmp_ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < tenant_schemas.count(); ++i) { + DEBUG_SYNC(BEFORE_CHECK_LOCALITY); bool alter_locality_finish = false; bool meta_alter_locality_finish = false; + int check_ret = OB_NEED_WAIT; uint64_t tenant_id = OB_INVALID_TENANT_ID; - if (OB_ISNULL(tenant_schemas.at(i))) { + int64_t job_id = 0; + ObCurTraceId::init(GCONF.self_addr_); + if (OB_ISNULL(tenant_schemas.at(i)) || OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("tenant schema is null", KR(ret), "schema", tenant_schemas.at(i)); + LOG_WARN("tenant schema or GCTX.sql_proxy_ is null", KR(ret), "schema", tenant_schemas.at(i), + KP(GCTX.sql_proxy_)); } else if (!tenant_schemas.at(i)->is_valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid tenant schema", KR(ret), "schema", tenant_schemas.at(i)); } else if (FALSE_IT(tenant_id = tenant_schemas.at(i)->get_tenant_id())) { // shall never be here - } else if (is_meta_tenant(tenant_id)) { - // by pass, no need to check meta tenant, because it will be checked under user tenant - } else if (tenant_schemas.at(i)->get_previous_locality_str().empty()) { - // by pass, no locality alteration for this tenant + } else if (is_meta_tenant(tenant_id) + || tenant_schemas.at(i)->get_previous_locality_str().empty()) { + continue; + } else if (OB_FAIL(find_rs_job(tenant_id, job_id, *GCTX.sql_proxy_))) { + // find the corresponding rs job at first, then check if we can complete it + // if we only find the rs job at the committing period, + // we do not know whether the job has been changed during checking process + // e.g. job 1 is the rs job before checking, + // right after checking, job 2 is created and job 1 is canceled by job 2, + // then committing process will find job 2 and complete job 2 immediately, + // which means, job 2 is completed without checking. + if (OB_ENTRY_NOT_EXIST == ret) { + FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] there exists locality changing without corresponding rs job", + KR(ret), KPC(tenant_schemas.at(i))); + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job", KR(ret), K(tenant_id)); + } + } + if (OB_FAIL(ret)) { } else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match( tenant_id, *unit_mgr_, @@ -129,13 +150,18 @@ int ObAlterLocalityFinishChecker::check() meta_alter_locality_finish))){ LOG_WARN("fail to check tenant locality match", KR(tmp_ret), "meta_tenant_id", gen_meta_tenant_id(tenant_id), K(meta_alter_locality_finish)); + } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); } else if (alter_locality_finish - && (meta_alter_locality_finish || is_sys_tenant(tenant_id))) { + && OB_NEED_WAIT != check_ret + && (meta_alter_locality_finish || is_sys_tenant(tenant_id))) { DEBUG_SYNC(BEFORE_FINISH_LOCALITY); const int64_t timeout = GCONF.internal_sql_execute_timeout; // 30s default rootserver::ObCommitAlterTenantLocalityArg arg; arg.tenant_id_ = tenant_id; arg.exec_tenant_id_ = OB_SYS_TENANT_ID; + arg.rs_job_id_ = job_id; + arg.rs_job_check_ret_ = check_ret; if (OB_FAIL(check_stop())) { LOG_WARN("ObAlterLocalityFinishChecker stopped", KR(ret)); } else if (OB_SUCCESS != (tmp_ret = common_rpc_proxy_->to(self_).timeout(timeout).commit_alter_tenant_locality(arg))) { @@ -147,6 +173,28 @@ int ObAlterLocalityFinishChecker::check() return ret; } +int ObAlterLocalityFinishChecker::find_rs_job( + const uint64_t tenant_id, + int64_t &job_id, + ObISQLClient &sql_proxy) +{ + int ret = OB_SUCCESS; + if (OB_SUCC(RS_JOB_FIND( + ALTER_TENANT_LOCALITY, + job_id, + sql_proxy, + "tenant_id", tenant_id))) { + // good, find job + } else if (OB_ENTRY_NOT_EXIST == ret && OB_SUCC(RS_JOB_FIND( + ROLLBACK_ALTER_TENANT_LOCALITY, + job_id, + sql_proxy, + "tenant_id", tenant_id))) { + // good, find job + } + return ret; +} + int ObAlterLocalityFinishChecker::check_stop() const { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_alter_locality_finish_checker.h b/src/rootserver/ob_alter_locality_finish_checker.h index 8ccfd2450..b6bb28fc1 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.h +++ b/src/rootserver/ob_alter_locality_finish_checker.h @@ -41,11 +41,16 @@ struct ObCommitAlterTenantLocalityArg : public obrpc::ObDDLArg { OB_UNIS_VERSION(1); public: - ObCommitAlterTenantLocalityArg() : tenant_id_(common::OB_INVALID_ID) {} + ObCommitAlterTenantLocalityArg() : + tenant_id_(common::OB_INVALID_ID), + rs_job_id_(0), + rs_job_check_ret_(OB_NEED_WAIT) {} bool is_valid() const { return common::OB_INVALID_ID != tenant_id_;} - TO_STRING_KV(K_(tenant_id)); + TO_STRING_KV(K_(tenant_id), K_(rs_job_id), K_(rs_job_check_ret)); uint64_t tenant_id_; + int64_t rs_job_id_; + int rs_job_check_ret_; }; class ObAlterLocalityFinishChecker : public share::ObCheckStopProvider @@ -63,7 +68,8 @@ public: common::ObMySQLProxy &sql_proxy, share::ObLSTableOperator &lst_operator); int check(); - + static int find_rs_job(const uint64_t tenant_id, int64_t &job_id, ObISQLClient &sql_proxy); + private: //check whether this checker is stopped virtual int check_stop() const override; diff --git a/src/rootserver/ob_alter_primary_zone_checker.cpp b/src/rootserver/ob_alter_primary_zone_checker.cpp new file mode 100644 index 000000000..9b0bed793 --- /dev/null +++ b/src/rootserver/ob_alter_primary_zone_checker.cpp @@ -0,0 +1,221 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX RS + +#include "ob_alter_primary_zone_checker.h" + +#include "ob_balance_ls_primary_zone.h" + +namespace oceanbase +{ +using namespace common; +using namespace share; +namespace rootserver +{ +ObAlterPrimaryZoneChecker::ObAlterPrimaryZoneChecker(volatile bool &is_stopped) + :is_stopped_(is_stopped), + is_inited_(false), + schema_service_(NULL) +{ +} +ObAlterPrimaryZoneChecker::~ObAlterPrimaryZoneChecker() +{ +} +int ObAlterPrimaryZoneChecker::init(share::schema::ObMultiVersionSchemaService &schema_service) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("init twice", KR(ret), K(is_inited_)); + } else { + schema_service_ = &schema_service; + is_inited_ = true; + } + return ret; +} + +int ObAlterPrimaryZoneChecker::check() +{ + int ret = OB_SUCCESS; + LOG_INFO("start check primary zone"); + ObArray tenant_ids; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObAlterPrimaryZoneChecker not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema_service_ is null", KR(ret), KP(schema_service_)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("ObAlterPrimaryZoneChecker stop", KR(ret)); + } else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) { + LOG_WARN("fail to get tenant id array", KR(ret)); + } else { + int tmp_ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { + // for each tenant, check if it has rs job ALTER_TENANT_PRIMARY_ZONE + // if yes, check whether ls is balanced + // **TODO(linqiucen.lqc): check __all_ls_election_reference_info + const uint64_t tenant_id = tenant_ids.at(i); + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); + } else if (is_meta_tenant(tenant_id)) { + // skip + } else { + ObCurTraceId::init(GCONF.self_addr_); + LOG_INFO("start check primary zone", K(tenant_id)); + DEBUG_SYNC(BEFORE_CHECK_PRIMARY_ZONE); + if (OB_TMP_FAIL(check_primary_zone_for_each_tenant_(tenant_id))) { + LOG_WARN("fail to execute check_primary_zone_for_each_tenant", KR(ret), KR(tmp_ret), K(tenant_id)); + } + } + } + } + return ret; +} + +int ObAlterPrimaryZoneChecker::create_alter_tenant_primary_zone_rs_job_if_needed( + const obrpc::ObModifyTenantArg &arg, + const uint64_t tenant_id, + const share::schema::ObTenantSchema &orig_tenant_schema, + const share::schema::ObTenantSchema &new_tenant_schema, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + ObArray orig_first_primary_zone; + ObArray new_first_primary_zone; + bool is_primary_zone_changed = false; + uint64_t tenant_data_version = 0; + bool need_skip = false; + if (!arg.alter_option_bitset_.has_member(obrpc::ObModifyTenantArg::PRIMARY_ZONE)) { + need_skip = true; + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_WARN("fail to get min data version", KR(ret), K(tenant_id)); + } else if (tenant_data_version < DATA_VERSION_4_2_1_0) { + need_skip = true; + } else { + // step 1: cancel rs job ALTER_TENANT_PRIMARY_ZONE if exists + int64_t job_id = 0; + if(OB_FAIL(RS_JOB_FIND( + ALTER_TENANT_PRIMARY_ZONE, + job_id, + trans, + "tenant_id", tenant_id))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job ALTER_TENANT_PRIMARY_ZONE", KR(ret)); + } + } else { + ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); + FLOG_INFO("[ALTER_TENANT_PRIMARY_ZONE NOTICE] cancel an old inprogress rs job", KR(ret), + K(tenant_id), K(job_id)); + } + } + int64_t new_job_id = 0; + if (OB_FAIL(ret) || need_skip) { + } else if (OB_FAIL(ObRootUtils::is_first_priority_primary_zone_changed( + orig_tenant_schema, + new_tenant_schema, + orig_first_primary_zone, + new_first_primary_zone, + is_primary_zone_changed))) { + // all the necessary checks such as input validity are in this func + LOG_WARN("fail to execute is_first_priority_primary_zone_changed", KR(ret), + K(orig_tenant_schema), K(new_first_primary_zone)); + } else { + // step 2: create a new rs job ALTER_TENANT_PRIMARY_ZONE + ret = RS_JOB_CREATE_WITH_RET( + new_job_id, + JOB_TYPE_ALTER_TENANT_PRIMARY_ZONE, + trans, + "tenant_id", tenant_id, + "tenant_name", new_tenant_schema.get_tenant_name(), + "sql_text", ObHexEscapeSqlStr(arg.ddl_stmt_str_), + "extra_info", orig_tenant_schema.get_primary_zone()); + FLOG_INFO("[ALTER_TENANT_PRIMARY_ZONE NOTICE] create a new rs job", KR(ret), K(arg), K(new_job_id)); + if (OB_SUCC(ret) && !is_primary_zone_changed) { + // step 3: complete the rs job if the first priority primary zone is not changed + // otherwise wait for alter_primary_zone_checker to complete it + ret = RS_JOB_COMPLETE(new_job_id, 0, trans); + FLOG_INFO("[ALTER_TENANT_PRIMARY_ZONE NOTICE] no change of ls, complete immediately", KR(ret), K(new_job_id)); + } + } + return ret; +} + +int ObAlterPrimaryZoneChecker::check_primary_zone_for_each_tenant_(uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + // step 1: find rs job ALTER_TENANT_PRIMARY_ZONE + // step 2: check ls balance if rs job exists + // step 3: complete the rs job if ls is balanced + // if we only find the rs job at the committing period, + // we do not know whether the job has been changed during checking process + // e.g. job 1 is the rs job before checking, + // right after checking, job 2 is created and job 1 is canceled by job 2, + // then committing process will find job 2 and complete job 2 immediately, + // which means, job 2 is completed without checking. + int64_t job_id = 0; + int check_ret = OB_NEED_WAIT; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(RS_JOB_FIND( + ALTER_TENANT_PRIMARY_ZONE, + job_id, + *GCTX.sql_proxy_, + "tenant_id", tenant_id))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job ALTER_TENANT_PRIMARY_ZONE", KR(ret), K(tenant_id)); + } + } else if (is_sys_tenant(tenant_id) + && OB_FAIL(ObBalanceLSPrimaryZone::check_sys_ls_primary_zone_balanced(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_sys_ls_primary_zone_balanced", KR(ret), K(tenant_id)); + } else if (is_user_tenant(tenant_id) + && OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); + } else if (OB_NEED_WAIT != check_ret) { + DEBUG_SYNC(BEFORE_FINISH_PRIMARY_ZONE); + common::ObMySQLTransaction trans; + if (OB_FAIL(check_stop())) { + LOG_WARN("ObAlterPrimaryZoneChecker stop", KR(ret)); + } else if (OB_FAIL(RS_JOB_COMPLETE(job_id, check_ret, *GCTX.sql_proxy_))) { + if (OB_EAGAIN == ret) { + FLOG_WARN("[ALTER_TENANT_PRIMARY_ZONE NOTICE] the specified rs job might has been already " + "completed due to a new job or deleted in table manually", + KR(ret), K(tenant_id), K(job_id), K(check_ret)); + ret = OB_SUCCESS; // no need to return error code + } else { + LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret)); + } + } else { + FLOG_INFO("[ALTER_TENANT_PRIMARY_ZONE NOTICE] complete an inprogress rs job", KR(ret), + K(tenant_id), K(job_id), K(check_ret)); + } + } + return ret; +} + +int ObAlterPrimaryZoneChecker::check_stop() const +{ + int ret = OB_SUCCESS; + if (is_stopped_) { + ret = OB_CANCELED; + LOG_WARN("ObAlterPrimaryZoneChecker stopped", KR(ret), K(is_stopped_)); + } + return ret; +} +} // end namespace rootserver +} // end namespace oceanbase \ No newline at end of file diff --git a/src/rootserver/ob_alter_primary_zone_checker.h b/src/rootserver/ob_alter_primary_zone_checker.h new file mode 100644 index 000000000..fb7778a5c --- /dev/null +++ b/src/rootserver/ob_alter_primary_zone_checker.h @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#ifndef OCEANBASE_ROOTSERVER_OB_ALTER_PRIMARY_ZONE_CHECKER_H_ +#define OCEANBASE_ROOTSERVER_OB_ALTER_PRIMARY_ZONE_CHECKER_H_ +#include "share/ob_define.h" +#include "ob_root_utils.h" +namespace oceanbase +{ +namespace rootserver +{ +class ObAlterPrimaryZoneChecker : public share::ObCheckStopProvider +{ +public: + ObAlterPrimaryZoneChecker(volatile bool &is_stopped); + virtual ~ObAlterPrimaryZoneChecker(); + int init(share::schema::ObMultiVersionSchemaService &schema_service); + int check(); + static int create_alter_tenant_primary_zone_rs_job_if_needed( + const obrpc::ObModifyTenantArg &arg, + const uint64_t tenant_id, + const share::schema::ObTenantSchema &orig_tenant_schema, + const share::schema::ObTenantSchema &new_tenant_schema, + ObMySQLTransaction &trans); +private: + virtual int check_stop() const override; + int check_primary_zone_for_each_tenant_(uint64_t tenant_id); + volatile bool &is_stopped_; + bool is_inited_; + share::schema::ObMultiVersionSchemaService *schema_service_; +private: + DISALLOW_COPY_AND_ASSIGN(ObAlterPrimaryZoneChecker); +}; +} // end namespace rootserver +} // end namespace oceanbase + +#endif // OCEANBASE_ROOTSERVER_OB_ALTER_PRIMARY_ZONE_CHECKER_H_ \ No newline at end of file diff --git a/src/rootserver/ob_balance_ls_primary_zone.cpp b/src/rootserver/ob_balance_ls_primary_zone.cpp index b8a6843b9..cbff4e0e0 100755 --- a/src/rootserver/ob_balance_ls_primary_zone.cpp +++ b/src/rootserver/ob_balance_ls_primary_zone.cpp @@ -260,35 +260,98 @@ int ObBalanceLSPrimaryZone::balance_ls_primary_zone_( int ObBalanceLSPrimaryZone::try_update_ls_primary_zone( const share::ObLSPrimaryZoneInfo &primary_zone_info, const common::ObZone &new_primary_zone, - const common::ObSqlString &zone_priority) + const common::ObSqlString &new_zone_priority) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!primary_zone_info.is_valid() - || new_primary_zone.is_empty() || zone_priority.empty())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("primary zone info is invalid", KR(ret), K(primary_zone_info), - K(new_primary_zone), K(zone_priority)); - } else if (OB_ISNULL(GCTX.sql_proxy_)) { + bool need_update = false; + if (OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql proxy is null", KR(ret)); - } else if (new_primary_zone != primary_zone_info.get_primary_zone() - || zone_priority.string() != primary_zone_info.get_zone_priority_str()) { + LOG_WARN("sql proxy is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(need_update_ls_primary_zone( + primary_zone_info, + new_primary_zone, + new_zone_priority, + need_update))) { + LOG_WARN("fail to check need_update_ls_primary_zone", KR(ret), K(primary_zone_info), + K(new_primary_zone), K(new_zone_priority)); + } else if (need_update) { ObLSLifeAgentManager ls_life_agent(*GCTX.sql_proxy_); if (OB_FAIL(ls_life_agent.update_ls_primary_zone(primary_zone_info.get_tenant_id(), primary_zone_info.get_ls_id(), - new_primary_zone, zone_priority.string()))) { + new_primary_zone, new_zone_priority.string()))) { LOG_WARN("failed to update ls primary zone", KR(ret), K(primary_zone_info), - K(new_primary_zone), K(zone_priority)); + K(new_primary_zone), K(new_zone_priority)); } LOG_INFO("update ls primary zone", KR(ret), K(new_primary_zone), - K(zone_priority), K(primary_zone_info)); + K(new_zone_priority), K(primary_zone_info)); } else { //no need update } return ret; } +int ObBalanceLSPrimaryZone::need_update_ls_primary_zone ( + const share::ObLSPrimaryZoneInfo &primary_zone_info, + const common::ObZone &new_primary_zone, + const common::ObSqlString &new_zone_priority, + bool &need_update) +{ + int ret = OB_SUCCESS; + need_update = false; + if (OB_UNLIKELY(!primary_zone_info.is_valid() + || new_primary_zone.is_empty() || new_zone_priority.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("primary zone info is invalid", KR(ret), K(primary_zone_info), + K(new_primary_zone), K(new_zone_priority)); + } else if (new_primary_zone != primary_zone_info.get_primary_zone() + || new_zone_priority.string() != primary_zone_info.get_zone_priority_str()) { + need_update = true; + } + return ret; +} + int ObBalanceLSPrimaryZone::try_update_sys_ls_primary_zone(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + share::ObLSPrimaryZoneInfo primary_zone_info; + ObZone new_primary_zone; + ObSqlString new_zone_priority; + if (OB_UNLIKELY(is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("user tenant no need update sys ls primary zone", KR(ret), K(tenant_id)); + } else if (OB_FAIL(prepare_sys_ls_balance_primary_zone_info( + tenant_id, + primary_zone_info, + new_primary_zone, + new_zone_priority))) { + LOG_WARN("fail to prepare sys_ls_balance_primary_zone_info", KR(ret), K(tenant_id)); + } else if (OB_FAIL(try_update_ls_primary_zone( + primary_zone_info, new_primary_zone, new_zone_priority))) { + LOG_WARN("failed to update ls primary zone", KR(ret), K(primary_zone_info), + K(new_primary_zone), K(new_zone_priority)); + } else if (is_meta_tenant(tenant_id)) { + //user sys ls has same primary zone with meta sys ls + share::ObLSPrimaryZoneInfo user_primary_zone_info; + share::ObLSStatusOperator status_op; + const uint64_t user_tenant_id = gen_user_tenant_id(tenant_id); + if (OB_FAIL(status_op.get_ls_primary_zone_info(user_tenant_id, SYS_LS, + user_primary_zone_info, *GCTX.sql_proxy_))) { + LOG_WARN("failed to get ls primary_zone info", KR(ret), K(tenant_id), K(user_tenant_id)); + } else if (OB_FAIL(try_update_ls_primary_zone( + user_primary_zone_info, new_primary_zone, + new_zone_priority))) { + LOG_WARN("failed to update ls primary zone", KR(ret), K(user_primary_zone_info), + K(new_primary_zone), K(new_zone_priority)); + } + } + return ret; +} + +int ObBalanceLSPrimaryZone::prepare_sys_ls_balance_primary_zone_info( + const uint64_t tenant_id, + share::ObLSPrimaryZoneInfo &primary_zone_info, + common::ObZone &new_primary_zone, + common::ObSqlString &new_zone_priority) { int ret = OB_SUCCESS; share::schema::ObTenantSchema tenant_schema; @@ -297,15 +360,12 @@ int ObBalanceLSPrimaryZone::try_update_sys_ls_primary_zone(const uint64_t tenant LOG_WARN("unexpected error", KR(ret), KP(GCTX.sql_proxy_), KP(GCTX.schema_service_)); } else if (OB_UNLIKELY(is_user_tenant(tenant_id))) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("user tenant no need update sys ls primary zone", KR(ret), K(tenant_id)); + LOG_WARN("only meta ans sys tenant are allowed", KR(ret), K(tenant_id)); } else if (OB_FAIL(ObTenantThreadHelper::get_tenant_schema(tenant_id, tenant_schema))) { LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id)); } else { - share::ObLSPrimaryZoneInfo primary_zone_info; - ObArray primary_zone; share::ObLSStatusOperator status_op; - ObZone new_primary_zone; - ObSqlString new_zone_priority; + ObArray primary_zone; if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_primary_zone_array( tenant_schema, primary_zone))) { LOG_WARN("failed to get tenant primary zone array", KR(ret), K(tenant_schema)); @@ -326,31 +386,45 @@ int ObBalanceLSPrimaryZone::try_update_sys_ls_primary_zone(const uint64_t tenant } else if (is_sys_tenant(tenant_id)) { //sys tenant use tenant normalize primary zone if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_zone_priority( - tenant_schema, new_zone_priority))) { + tenant_schema, new_zone_priority))) { LOG_WARN("failed to get tenant primary zone array", KR(ret), K(tenant_schema)); } } else if (OB_FAIL(ObTenantThreadHelper::get_zone_priority(new_primary_zone, - tenant_schema, new_zone_priority))) { + tenant_schema, new_zone_priority))) { LOG_WARN("failed to get normalize primary zone", KR(ret), K(new_primary_zone)); } - if (FAILEDx(try_update_ls_primary_zone( - primary_zone_info, new_primary_zone, new_zone_priority))) { - LOG_WARN("failed to update ls primary zone", KR(ret), K(primary_zone_info), - K(new_primary_zone), K(new_zone_priority)); - } else if (is_meta_tenant(tenant_id)) { - //user sys ls has same primary zone with meta sys ls - share::ObLSPrimaryZoneInfo user_primary_zone_info; - const uint64_t user_tenant_id = gen_user_tenant_id(tenant_id); - if (OB_FAIL(status_op.get_ls_primary_zone_info(user_tenant_id, SYS_LS, - user_primary_zone_info, *GCTX.sql_proxy_))) { - LOG_WARN("failed to get ls primary_zone info", KR(ret), K(tenant_id), K(user_tenant_id)); - } else if (OB_FAIL(try_update_ls_primary_zone( - user_primary_zone_info, new_primary_zone, - new_zone_priority))) { - LOG_WARN("failed to update ls primary zone", KR(ret), K(user_primary_zone_info), - K(new_primary_zone), K(new_zone_priority)); - } - } + } + return ret; +} + +int ObBalanceLSPrimaryZone::check_sys_ls_primary_zone_balanced(const uint64_t tenant_id, int &check_ret) +{ + int ret = OB_SUCCESS; + share::ObLSPrimaryZoneInfo primary_zone_info; + ObZone new_primary_zone; + ObSqlString new_zone_priority; + bool need_update = false; + check_ret = OB_NEED_WAIT; + if (OB_UNLIKELY(is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("only meta ans sys tenant are allowed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(prepare_sys_ls_balance_primary_zone_info( + tenant_id, + primary_zone_info, + new_primary_zone, + new_zone_priority))) { + LOG_WARN("fail to prepare sys_ls_balance_primary_zone_info", KR(ret), K(tenant_id)); + } else if (OB_FAIL(need_update_ls_primary_zone( + primary_zone_info, + new_primary_zone, + new_zone_priority, + need_update))) { + LOG_WARN("fail to check need_update_ls_primary_zone", KR(ret), K(primary_zone_info), + K(new_primary_zone), K(new_zone_priority)); + } else if (!need_update) { + check_ret = OB_SUCCESS; + } else { + check_ret = OB_NEED_WAIT; } return ret; } diff --git a/src/rootserver/ob_balance_ls_primary_zone.h b/src/rootserver/ob_balance_ls_primary_zone.h index b72e6dbba..531fbac34 100644 --- a/src/rootserver/ob_balance_ls_primary_zone.h +++ b/src/rootserver/ob_balance_ls_primary_zone.h @@ -51,7 +51,18 @@ public: static int try_update_ls_primary_zone( const share::ObLSPrimaryZoneInfo &primary_zone_info, const common::ObZone &new_primary_zone, - const common::ObSqlString &zone_priority); + const common::ObSqlString &new_zone_priority); + static int need_update_ls_primary_zone ( + const share::ObLSPrimaryZoneInfo &primary_zone_info, + const common::ObZone &new_primary_zone, + const common::ObSqlString &new_zone_priority, + bool &need_update); + static int prepare_sys_ls_balance_primary_zone_info( + const uint64_t tenant_id, + share::ObLSPrimaryZoneInfo &primary_zone_info, + common::ObZone &new_primary_zone, + common::ObSqlString &new_zone_priority); + static int check_sys_ls_primary_zone_balanced(const uint64_t tenant_id, int &check_ret); private: static int adjust_primary_zone_by_ls_group_(const common::ObIArray &primary_zone_array, diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index ebb5446d8..fda21c6fb 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -72,6 +72,7 @@ #include "rootserver/ddl_task/ob_ddl_retry_task.h" #include "rootserver/freeze/ob_freeze_info_manager.h" #include "rootserver/freeze/ob_major_freeze_helper.h" +#include "rootserver/ob_alter_primary_zone_checker.h" #include "rootserver/ob_tenant_thread_helper.h"//get_zone_priority #include "lib/utility/ob_tracepoint.h" #include "observer/ob_server_struct.h" @@ -22453,28 +22454,29 @@ int ObDDLService::commit_alter_tenant_locality( LOG_WARN("fail to alter meta tenant", KR(ret)); } } - if (OB_SUCC(ret)) { - //do rs_job - ObRsJobInfo job_info; - if (OB_SUCC(RS_JOB_FIND(job_info, trans, - "job_type", "ALTER_TENANT_LOCALITY", - "job_status", "INPROGRESS", - "tenant_id", arg.tenant_id_))) { - // good, find job - } else if (OB_SUCC(RS_JOB_FIND(job_info, trans, - "job_type", "ROLLBACK_ALTER_TENANT_LOCALITY", - "job_status", "INPROGRESS", - "tenant_id", arg.tenant_id_))) { - // good, find job - } else { - LOG_WARN("failed to find job", KR(ret), "tenant_id", arg.tenant_id_); - } - if (OB_SUCC(ret) && job_info.job_id_ > 0) { - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, 0, trans))) { - LOG_WARN("do rs_job update failed", K(ret), K(job_info)); - } - } - } + int return_code = arg.rs_job_check_ret_; + int64_t job_id = arg.rs_job_id_; + if (OB_FAIL(ret)) { + } else if (OB_SUCC(RS_JOB_COMPLETE(job_id, return_code, trans))) { + FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] complete an inprogress rs job", KR(ret), + K(arg), K(return_code)); + } else { + LOG_WARN("fail to complete rs job", KR(ret), K(job_id), K(return_code)); + if (OB_EAGAIN == ret) { + int64_t find_job_id = 0; + if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(arg.tenant_id_, find_job_id, trans))) { + LOG_WARN("fail to find rs job", KR(ret), K(arg)); + if (OB_ENTRY_NOT_EXIST == ret) { + FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] the specified rs job might has " + "been already deleted in table manually", KR(ret), K(arg), K(return_code)); + ret = OB_SUCCESS; + } + } else { + ret = OB_EAGAIN; + FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] a non-checked rs job cannot be committed", KR(ret), K(arg), K(find_job_id)); + } + } + } } int temp_ret = OB_SUCCESS; if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) { @@ -22623,7 +22625,13 @@ int ObDDLService::set_new_tenant_options( K(ret), K(new_tenant_schema), K(orig_tenant_schema)); } else {} // no more to do } else if (TO_NEW_LOCALITY == alter_locality_type) { - if (OB_FAIL(try_modify_tenant_locality( + if (arg.alter_option_bitset_.has_member(obrpc::ObModifyTenantArg::FORCE_LOCALITY)) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("only locality rollback can be forced", KR(ret), K(arg)); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "only locality rollback can be forced, " + "forcing to be in a new locality is"); // forcing to be in a new locality is not allowed + } + if (FAILEDx(try_modify_tenant_locality( arg, new_tenant_schema, orig_tenant_schema, zones_in_pool, zone_region_list, alter_locality_op))) { LOG_WARN("fail to try modify tenant locality", @@ -22775,7 +22783,7 @@ int ObDDLService::try_rollback_modify_tenant_locality( } else if (0 < alter_paxos_tasks.count() || non_paxos_locality_modified) { if (arg.alter_option_bitset_.has_member(obrpc::ObModifyTenantArg::FORCE_LOCALITY)) { if (OB_FAIL(new_schema.set_previous_locality(""))) { - LOG_WARN("fail to set previous locality", K(ret)); + LOG_WARN("fail to set previous locality", KR(ret)); } } else { if (OB_FAIL(new_schema.set_previous_locality(orig_schema.get_locality_str()))) { @@ -23554,7 +23562,19 @@ int ObDDLService::modify_tenant_inner_phase(const ObModifyTenantArg &arg, const #endif } - if (OB_FAIL(ret)) { + if (FAILEDx(ObAlterPrimaryZoneChecker::create_alter_tenant_primary_zone_rs_job_if_needed( + arg, + tenant_id, + *orig_tenant_schema, + new_tenant_schema, + trans))) { + // if the command is alter tenant primary zone, we need to check whether first priority zone + // has been changed. if so, the number of ls will be changed as well. + // when the change occurs, we need to create a rs job ALTER_TENANT_PRIMARY_ZONE to + // track if the number of ls matches the number of first primary zone + // otherwise, the rs job is completed immediately + LOG_WARN("fail to execute create_alter_tenant_primary_zone_rs_job_if_needed", KR(ret), + K(arg), K(tenant_id), KPC(orig_tenant_schema), K(new_tenant_schema)); } else if (OB_FAIL(ddl_operator.alter_tenant(new_tenant_schema, trans, &arg.ddl_stmt_str_))) { LOG_WARN("failed to alter tenant", K(ret)); } else if (OB_FAIL(try_alter_meta_tenant_schema( @@ -23654,7 +23674,6 @@ int ObDDLService::modify_tenant_inner_phase(const ObModifyTenantArg &arg, const return ret; } - // not used // When alter tenant, tenant option and sys variable are both set to readonly, // the current implementation is based on sys variable @@ -23793,64 +23812,47 @@ int ObDDLService::record_tenant_locality_event_history( ObMySQLTransaction &trans) { int ret = OB_SUCCESS; - if (ALTER_LOCALITY == alter_locality_op) { - int64_t job_id = RS_JOB_CREATE(ALTER_TENANT_LOCALITY, trans, - "tenant_name", tenant_schema.get_tenant_name(), - "tenant_id", tenant_schema.get_tenant_id(), - "sql_text", ObHexEscapeSqlStr(arg.ddl_stmt_str_), - "extra_info", tenant_schema.get_previous_locality_str()); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } - } else if (ROLLBACK_ALTER_LOCALITY == alter_locality_op) { - ObRsJobInfo job_info; - if (OB_SUCC(RS_JOB_FIND(job_info, trans, - "job_type", "ALTER_TENANT_LOCALITY", - "job_status", "INPROGRESS", - "tenant_id", tenant_schema.get_tenant_id()))) { - //good find job - } else if (OB_SUCC(RS_JOB_FIND(job_info, trans, - "job_type", "ROLLBACK_ALTER_TENANT_LOCALITY", - "job_status", "INPROGRESS", - "tenant_id", tenant_schema.get_tenant_id()))) { - //good find job - } else { - LOG_WARN("failed to find job need rollback", K(ret), K(tenant_schema.get_tenant_id())); - } - if (OB_SUCC(ret) && job_info.job_id_ > 0) { - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, -1, trans))) {// The change task is rolled back, this change failed - LOG_WARN("update rs_job failed", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } - } else { - LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } - if (OB_SUCC(ret)) { - int64_t job_id = RS_JOB_CREATE(ROLLBACK_ALTER_TENANT_LOCALITY, trans, - "tenant_name", tenant_schema.get_tenant_name(), - "tenant_id", tenant_schema.get_tenant_id(), - "sql_text", ObHexEscapeSqlStr(arg.ddl_stmt_str_), - "extra_info", tenant_schema.get_locality_str()); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } - } - } else if (NOP_LOCALITY_OP == alter_locality_op) { - int64_t job_id = RS_JOB_CREATE(ALTER_TENANT_LOCALITY, trans, - "tenant_name", tenant_schema.get_tenant_name(), - "tenant_id", tenant_schema.get_tenant_id(), - "sql_text", ObHexEscapeSqlStr(arg.ddl_stmt_str_), - "extra_info", tenant_schema.get_previous_locality_str()); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } else if (OB_FAIL(RS_JOB_COMPLETE(job_id, 0, trans))) {// The change task is rolled back, this change failed - LOG_WARN("complete rs_job failed", K(ret), "tenant_id", tenant_schema.get_tenant_id()); - } - } else { + uint64_t tenant_data_version = 0; + int64_t job_id = 0; + ObRsJobType job_type = JOB_TYPE_INVALID; + if (ALTER_LOCALITY_OP_INVALID == alter_locality_op) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid alter locality op", K(ret), K(alter_locality_op)); + } else if (ROLLBACK_ALTER_LOCALITY == alter_locality_op) { + int64_t job_id = 0; + if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(tenant_schema.get_tenant_id(), job_id, trans))) { + LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id()); + } else { + ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); // The change task is rolled back, this change failed + FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] cancel an old inprogress rs job due to rollback", KR(ret), + "tenant_id", tenant_schema.get_tenant_id(), K(job_id)); + } + if (FAILEDx(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, tenant_data_version))) { + LOG_WARN("fail to get sys tenant's min data version", KR(ret)); + } else if (tenant_data_version < DATA_VERSION_4_2_1_0) { + job_type = ObRsJobType::JOB_TYPE_ROLLBACK_ALTER_TENANT_LOCALITY; + } + } + if (OB_SUCC(ret)) { + // ALTER_LOCALITY, ROLLBACK_ALTER_LOCALITY(only 4.2), NOP_LOCALITY_OP + job_type = ObRsJobType::JOB_TYPE_INVALID == job_type ? + ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY : job_type; + ret = RS_JOB_CREATE_WITH_RET(job_id, job_type, trans, + "tenant_name", tenant_schema.get_tenant_name(), + "tenant_id", tenant_schema.get_tenant_id(), + "sql_text", ObHexEscapeSqlStr(arg.ddl_stmt_str_), + "extra_info", tenant_schema.get_previous_locality_str()); + FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] create a new rs job", KR(ret), + "tenant_id", tenant_schema.get_tenant_id(), K(job_id), K(alter_locality_op)); + } + if (OB_SUCC(ret)) { + if ((ROLLBACK_ALTER_LOCALITY == alter_locality_op + && arg.alter_option_bitset_.has_member(obrpc::ObModifyTenantArg::FORCE_LOCALITY)) + || NOP_LOCALITY_OP == alter_locality_op) { + ret = RS_JOB_COMPLETE(job_id, 0, trans); + FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] complete a new rs job immediately", KR(ret), + "tenant_id", tenant_schema.get_tenant_id(), K(job_id), K(alter_locality_op)); + } } return ret; } @@ -30373,6 +30375,12 @@ int ObDDLService::check_alter_tenant_replica_options( new_tenant_schema, orig_tenant_schema, zone_list, schema_guard))) { LOG_WARN("fail to check replica options", K(ret)); } else {} // no more + if (OB_OP_NOT_ALLOW == ret + && arg.alter_option_bitset_.has_member(obrpc::ObModifyTenantArg::FORCE_LOCALITY)) { + ret = OB_SUCCESS; + LOG_WARN("FORCE ROLLBACK LOCALITY should skip all checks", KR(ret), K(arg), + K(orig_tenant_schema), K(new_tenant_schema)); + } return ret; } @@ -33595,6 +33603,7 @@ int ObDDLService::check_alter_tenant_when_rebalance_is_disabled_( ObArray orig_first_primary_zone; ObArray new_first_primary_zone; bool is_allowed = true; + bool is_first_primary_zone_changed = false; if (OB_UNLIKELY(orig_tenant_schema.get_tenant_id() != new_tenant_schema.get_tenant_id())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid input tenant schema", KR(ret), K(orig_tenant_schema), K(new_tenant_schema)); @@ -33604,26 +33613,15 @@ int ObDDLService::check_alter_tenant_when_rebalance_is_disabled_( is_allowed = true; } else if (ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { is_allowed = true; - } else if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_primary_zone_array( + } else if (OB_FAIL(ObRootUtils::is_first_priority_primary_zone_changed( orig_tenant_schema, - orig_first_primary_zone))) { - LOG_WARN("fail to get tenant primary zone array", KR(ret), - K(orig_tenant_schema), K(orig_first_primary_zone)); - } else if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_primary_zone_array( new_tenant_schema, - new_first_primary_zone))) { - LOG_WARN("fail to get tenant primary zone array", KR(ret), - K(new_tenant_schema), K(new_first_primary_zone)); - } else if (orig_first_primary_zone.count() != new_first_primary_zone.count()) { + orig_first_primary_zone, + new_first_primary_zone, + is_first_primary_zone_changed))) { + LOG_WARN("fail to check is_first_priority_primary_zone_changed", KR(ret), K(orig_tenant_schema), K(new_tenant_schema)); + } else if (is_first_primary_zone_changed) { is_allowed = false; - } else { - ARRAY_FOREACH(new_first_primary_zone, idx) { - const ObZone &zone = new_first_primary_zone.at(idx); - if (!common::has_exist_in_array(orig_first_primary_zone, zone)) { - is_allowed = false; - break; - } - } } if (OB_SUCC(ret) && !is_allowed) { ObSqlString orig_str; diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index 418d07f24..5c9257c52 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -2309,6 +2309,7 @@ private: int add_sys_table_lob_aux(const int64_t tenant_id, const uint64_t table_id, ObTableSchema &meta_schema, ObTableSchema &data_schema); int check_has_multi_autoinc(share::schema::ObTableSchema &table_schema); + private: #ifdef OB_BUILD_ARBITRATION int check_tenant_arbitration_service_status_( @@ -2378,9 +2379,10 @@ private: const ObIArray &orig_table_schemas, const ObIArray &new_table_schemas, ObMySQLTransaction &trans); -int check_alter_tenant_when_rebalance_is_disabled_( - const share::schema::ObTenantSchema &orig_tenant_schema, - const share::schema::ObTenantSchema &new_tenant_schema); + + int check_alter_tenant_when_rebalance_is_disabled_( + const share::schema::ObTenantSchema &orig_tenant_schema, + const share::schema::ObTenantSchema &new_tenant_schema); private: int check_locality_compatible_(ObTenantSchema &schema); diff --git a/src/rootserver/ob_ls_service_helper.cpp b/src/rootserver/ob_ls_service_helper.cpp index fab56c49a..2774c0632 100755 --- a/src/rootserver/ob_ls_service_helper.cpp +++ b/src/rootserver/ob_ls_service_helper.cpp @@ -899,10 +899,14 @@ int ObLSServiceHelper::create_new_ls_in_trans( return ret; } -int ObLSServiceHelper::balance_ls_group(ObTenantLSInfo& tenant_ls_info) +int ObLSServiceHelper::balance_ls_group( + const bool need_execute_balance, + ObTenantLSInfo& tenant_ls_info, + bool &is_balanced) { int ret = OB_SUCCESS; int64_t task_cnt = 0; + is_balanced = true; if (OB_FAIL(tenant_ls_info.gather_stat())) { LOG_WARN("failed to gather stat", KR(ret)); } else if (OB_FAIL(try_shrink_standby_unit_group_(tenant_ls_info, task_cnt))) { @@ -933,12 +937,13 @@ int ObLSServiceHelper::balance_ls_group(ObTenantLSInfo& tenant_ls_info) } }//end for if (OB_SUCC(ret) && max_count - min_count > 1) { - if (OB_FAIL(balance_ls_group_between_unit_group_(tenant_ls_info, min_index, max_index))) { + is_balanced = false; + if (need_execute_balance && OB_FAIL(balance_ls_group_between_unit_group_(tenant_ls_info, min_index, max_index))) { LOG_WARN("failed to balance ls group between unit group", KR(ret), K(tenant_ls_info), K(min_index), K(max_index)); } } - } while (OB_SUCC(ret) && (max_count - min_count) > 1); + } while (OB_SUCC(ret) && (max_count - min_count) > 1 && need_execute_balance); } return ret; } diff --git a/src/rootserver/ob_ls_service_helper.h b/src/rootserver/ob_ls_service_helper.h index ad6a389c1..4d48996cc 100644 --- a/src/rootserver/ob_ls_service_helper.h +++ b/src/rootserver/ob_ls_service_helper.h @@ -230,7 +230,10 @@ public: ObTenantLSInfo& tenant_ls_info, common::ObMySQLTransaction &trans, const share::ObLSFlag &ls_flag); - static int balance_ls_group(ObTenantLSInfo& tenant_ls_info);//for standby tenant + static int balance_ls_group( + const bool need_execute_balance, + ObTenantLSInfo& tenant_ls_info, + bool &is_balanced);//for standby tenant static int update_ls_recover_in_trans( const share::ObLSRecoveryStat &ls_recovery_stat, const bool only_update_readable_scn, diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index a22d4fd04..f0b1f391e 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -1065,7 +1065,9 @@ int ObRecoveryLSService::do_standby_balance_() LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_)); } else { ObTenantLSInfo tenant_info(proxy_, &tenant_schema, tenant_id_); - if (OB_FAIL(ObLSServiceHelper::balance_ls_group(tenant_info))) { + bool is_balanced = false; + bool need_execute_balance = true; + if (OB_FAIL(ObLSServiceHelper::balance_ls_group(need_execute_balance, tenant_info, is_balanced))) { LOG_WARN("failed to balance ls group", KR(ret)); } } diff --git a/src/rootserver/ob_root_utils.cpp b/src/rootserver/ob_root_utils.cpp index 02751a22f..4fc6c6f2a 100644 --- a/src/rootserver/ob_root_utils.cpp +++ b/src/rootserver/ob_root_utils.cpp @@ -38,6 +38,7 @@ #include "share/ob_primary_zone_util.h" // ObPrimaryZoneUtil #include "share/ob_server_table_operator.h" #include "share/ob_zone_table_operation.h" +#include "rootserver/ob_tenant_balance_service.h" // for ObTenantBalanceService using namespace oceanbase::rootserver; using namespace oceanbase::share; @@ -2205,6 +2206,72 @@ int ObRootUtils::notify_switch_leader( return ret; } +int ObRootUtils::check_tenant_ls_balance(uint64_t tenant_id, int &check_ret) +{ + int ret = OB_SUCCESS; + uint64_t tenant_data_version = 0; + bool pass = false; + check_ret = OB_NEED_WAIT; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_WARN("fail to get min data version", KR(ret), K(tenant_id)); + } else if (tenant_data_version < DATA_VERSION_4_2_1_0 || !is_user_tenant(tenant_id)) { + // in v4.1 no need to check ls balance + // to let rs jobs' return_code be OB_SUCCESS, we set has_checked true + // non-user tenant has no ls balance + check_ret = OB_SUCCESS; + } else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { + check_ret = OB_SKIP_CHECKING_LS_STATUS; + } else if (OB_FAIL(ObTenantBalanceService::is_ls_balance_finished(tenant_id, pass))) { + LOG_WARN("fail to execute is_ls_balance_finished", KR(ret), K(tenant_id)); + } else if (pass) { + check_ret = OB_SUCCESS; + } else { + check_ret = OB_NEED_WAIT; + } + return ret; +} + +int ObRootUtils::is_first_priority_primary_zone_changed( + const share::schema::ObTenantSchema &orig_tenant_schema, + const share::schema::ObTenantSchema &new_tenant_schema, + ObIArray &orig_first_primary_zone, + ObIArray &new_first_primary_zone, + bool &is_changed) +{ + int ret = OB_SUCCESS; + orig_first_primary_zone.reset(); + new_first_primary_zone.reset(); + is_changed = false; + if (OB_UNLIKELY(orig_tenant_schema.get_tenant_id() != new_tenant_schema.get_tenant_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid input tenant schema", KR(ret), K(orig_tenant_schema), K(new_tenant_schema)); + } else if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_primary_zone_array( + orig_tenant_schema, + orig_first_primary_zone))) { + LOG_WARN("fail to get tenant primary zone array", KR(ret), + K(orig_tenant_schema), K(orig_first_primary_zone)); + } else if (OB_FAIL(ObPrimaryZoneUtil::get_tenant_primary_zone_array( + new_tenant_schema, + new_first_primary_zone))) { + LOG_WARN("fail to get tenant primary zone array", KR(ret), + K(new_tenant_schema), K(new_first_primary_zone)); + } else if (orig_first_primary_zone.count() != new_first_primary_zone.count()) { + is_changed = true; + } else { + ARRAY_FOREACH(new_first_primary_zone, idx) { + const ObZone &zone = new_first_primary_zone.at(idx); + if (!common::has_exist_in_array(orig_first_primary_zone, zone)) { + is_changed = true; + break; + } + } + } + return ret; +} + /////////////////////////////// ObClusterRole ObClusterInfoGetter::get_cluster_role_v2() diff --git a/src/rootserver/ob_root_utils.h b/src/rootserver/ob_root_utils.h index 9fe1edb5c..165dfb01b 100644 --- a/src/rootserver/ob_root_utils.h +++ b/src/rootserver/ob_root_utils.h @@ -645,6 +645,12 @@ public: static int get_primary_zone(ObZoneManager &zone_mgr, const common::ObIArray &zone_score_array, common::ObIArray &primary_zone); + static int is_first_priority_primary_zone_changed( + const share::schema::ObTenantSchema &orig_tenant_schema, + const share::schema::ObTenantSchema &new_tenant_schema, + ObIArray &orig_first_primary_zone, + ObIArray &new_first_primary_zone, + bool &is_changed); template static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr, @@ -673,6 +679,7 @@ public: obrpc::ObSrvRpcProxy *rpc_proxy, const share::ObLSInfo &ls_info, const obrpc::ObNotifySwitchLeaderArg::SwitchLeaderComment &comment); + static int check_tenant_ls_balance(uint64_t tenant_id, int &check_ret); }; diff --git a/src/rootserver/ob_rootservice_util_checker.cpp b/src/rootserver/ob_rootservice_util_checker.cpp index 8c36484a1..50e81c501 100644 --- a/src/rootserver/ob_rootservice_util_checker.cpp +++ b/src/rootserver/ob_rootservice_util_checker.cpp @@ -21,7 +21,8 @@ ObRootServiceUtilChecker::ObRootServiceUtilChecker(volatile bool &stop) stop_(stop), migrate_unit_finish_checker_(stop), alter_locality_finish_checker_(stop), - shrink_resource_pool_checker_(stop) + shrink_expand_resource_pool_checker_(stop), + alter_primary_zone_checker_(stop) { } @@ -58,10 +59,13 @@ int ObRootServiceUtilChecker::init( sql_proxy, lst_operator))) { LOG_WARN("fail to init alter locality finish checker", KR(ret)); - } else if (OB_FAIL(shrink_resource_pool_checker_.init(&schema_service, + } else if (OB_FAIL(shrink_expand_resource_pool_checker_.init(&schema_service, &unit_mgr, lst_operator, sql_proxy))) { - LOG_WARN("failed to init shrink resource pool", KR(ret)); - } else { + LOG_WARN("fail to init shrink resource pool", KR(ret)); + } else if (OB_FAIL(alter_primary_zone_checker_.init(schema_service))) { + LOG_WARN("fail to init alter primary zone checker", KR(ret)); + } + else { inited_ = true; } return ret; @@ -76,16 +80,20 @@ int ObRootServiceUtilChecker::rootservice_util_check() } else { int tmp_ret = OB_SUCCESS; // migrate unit finish checker - if (OB_SUCCESS != (tmp_ret = migrate_unit_finish_checker_.check())) { + if (OB_TMP_FAIL(migrate_unit_finish_checker_.check())) { LOG_WARN("fail to check migrate unit finish", KR(tmp_ret)); } // alter locality finish checker - if (OB_SUCCESS != (tmp_ret = alter_locality_finish_checker_.check())) { + if (OB_TMP_FAIL(alter_locality_finish_checker_.check())) { LOG_WARN("fail to check alter locality finish", KR(tmp_ret)); } - if (OB_TMP_FAIL(shrink_resource_pool_checker_.check())) { - LOG_WARN("failed to check shrink resource pool", KR(tmp_ret)); + if (OB_TMP_FAIL(shrink_expand_resource_pool_checker_.check())) { + LOG_WARN("fail to check shrink resource pool", KR(tmp_ret)); + } + + if (OB_TMP_FAIL(alter_primary_zone_checker_.check())) { + LOG_WARN("fail to check alter primary zone", KR(tmp_ret)); } } return ret; diff --git a/src/rootserver/ob_rootservice_util_checker.h b/src/rootserver/ob_rootservice_util_checker.h index bb869cafe..15ef3c6d9 100644 --- a/src/rootserver/ob_rootservice_util_checker.h +++ b/src/rootserver/ob_rootservice_util_checker.h @@ -16,7 +16,8 @@ #include "lib/net/ob_addr.h" #include "ob_migrate_unit_finish_checker.h" #include "ob_alter_locality_finish_checker.h" -#include "ob_shrink_resource_pool_checker.h" +#include "ob_shrink_expand_resource_pool_checker.h" +#include "ob_alter_primary_zone_checker.h" namespace oceanbase { @@ -45,7 +46,8 @@ private: volatile bool &stop_; ObMigrateUnitFinishChecker migrate_unit_finish_checker_; ObAlterLocalityFinishChecker alter_locality_finish_checker_; - ObShrinkResourcePoolChecker shrink_resource_pool_checker_; + ObShrinkExpandResourcePoolChecker shrink_expand_resource_pool_checker_; + ObAlterPrimaryZoneChecker alter_primary_zone_checker_; private: DISALLOW_COPY_AND_ASSIGN(ObRootServiceUtilChecker); diff --git a/src/rootserver/ob_rs_job_table_operator.cpp b/src/rootserver/ob_rs_job_table_operator.cpp index 67a71d1fd..b0aa06db5 100644 --- a/src/rootserver/ob_rs_job_table_operator.cpp +++ b/src/rootserver/ob_rs_job_table_operator.cpp @@ -54,10 +54,10 @@ const char* const ObRsJobTableOperator::TABLE_NAME = "__all_rootservice_job"; static const char* job_type_str_array[JOB_TYPE_MAX] = { NULL, "ALTER_TENANT_LOCALITY", - "ROLLBACK_ALTER_TENANT_LOCALITY", + "ROLLBACK_ALTER_TENANT_LOCALITY", // deprecated in V4.2 "MIGRATE_UNIT", "DELETE_SERVER", - "SHRINK_RESOURCE_TENANT_UNIT_NUM", + "SHRINK_RESOURCE_TENANT_UNIT_NUM", // deprecated in V4.2 "RESTORE_TENANT", "UPGRADE_STORAGE_FORMAT_VERSION", "STOP_UPGRADE_STORAGE_FORMAT_VERSION", @@ -71,16 +71,23 @@ static const char* job_type_str_array[JOB_TYPE_MAX] = { "UPGRADE_ALL_POST_ACTION", "UPGRADE_INSPECTION", "UPGRADE_END", - "UPGRADE_ALL" + "UPGRADE_ALL", + "ALTER_RESOURCE_TENANT_UNIT_NUM", + "ALTER_TENANT_PRIMARY_ZONE" }; +bool ObRsJobTableOperator::is_valid_job_type(const ObRsJobType &rs_job_type) +{ + return rs_job_type > ObRsJobType::JOB_TYPE_INVALID && rs_job_type < ObRsJobType::JOB_TYPE_MAX; +} + const char* ObRsJobTableOperator::get_job_type_str(ObRsJobType job_type) { STATIC_ASSERT(ARRAYSIZEOF(job_type_str_array) == JOB_TYPE_MAX, "type string array size mismatch with enum ObRsJobType"); const char* str = NULL; - if (job_type > JOB_TYPE_INVALID && job_type < JOB_TYPE_MAX) { + if (is_valid_job_type(job_type)) { str = job_type_str_array[job_type]; } return str; @@ -103,7 +110,9 @@ static const char* job_status_str_array[JOB_STATUS_MAX] = { NULL, "INPROGRESS", "SUCCESS", - "FAILED" + "FAILED", + "SKIP_CHECKING_LS_STATUS", + "CANCELED" }; ObRsJobStatus ObRsJobTableOperator::get_job_status(const common::ObString &job_status_str) @@ -150,7 +159,7 @@ int ObRsJobTableOperator::create_job(ObRsJobType job_type, share::ObDMLSqlSplice { int ret = OB_SUCCESS; const char* job_type_str = NULL; - if (JOB_TYPE_INVALID == job_type + if (!is_valid_job_type(job_type) || NULL == (job_type_str = get_job_type_str(job_type))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid job type", K(ret), K(job_type), K(job_type_str)); @@ -196,10 +205,12 @@ int ObRsJobTableOperator::create_job(ObRsJobType job_type, share::ObDMLSqlSplice } } } + if (OB_SUCC(ret) && job_id < 1) { + ret = OB_SQL_OPT_ERROR; + LOG_WARN("insert into all_rootservice_job failed", KR(ret), K(job_id)); + } if (OB_FAIL(ret)) { job_id = -1; - } else { - (void)delete_rows_if_necessary(); } return ret; } @@ -285,18 +296,32 @@ int ObRsJobTableOperator::cons_job_info(const sqlclient::ObMySQLResult &res, ObR return ret; } -int ObRsJobTableOperator::find_job(share::ObDMLSqlSplicer &pairs, ObRsJobInfo &job_info, common::ObISQLClient &trans) +int ObRsJobTableOperator::find_job( + const ObRsJobType job_type, + share::ObDMLSqlSplicer &pairs, + int64_t &job_id, + common::ObISQLClient &trans) { int ret = OB_SUCCESS; + const char* job_type_str = NULL; + job_id = 0; if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); + } else if (!is_valid_job_type(job_type) + || NULL == (job_type_str = get_job_type_str(job_type))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid job type", K(ret), K(job_type), K(job_type_str)); } else { ObSqlString sql; SMART_VAR(common::ObMySQLProxy::MySQLResult, res) { common::sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE ", TABLE_NAME))) { + if (OB_FAIL(sql.assign_fmt("SELECT job_id FROM %s WHERE ", TABLE_NAME))) { LOG_WARN("failed to assign sql", K(ret)); + } else if (OB_FAIL(pairs.add_column("job_type", job_type_str))) { + LOG_WARN("failed to add column", K(ret), K(job_type_str)); + } else if (OB_FAIL(pairs.add_column("job_status", "INPROGRESS"))) { + LOG_WARN("failed to add column", K(ret)); } else if (OB_FAIL(pairs.splice_predicates(sql))) { LOG_WARN("failed to splice predicates", K(ret), K(sql)); } else if (OB_FAIL(sql.append(" ORDER BY job_id DESC LIMIT 1"))) { @@ -309,9 +334,13 @@ int ObRsJobTableOperator::find_job(share::ObDMLSqlSplicer &pairs, ObRsJobInfo &j } else if (OB_FAIL(result->next())) { LOG_WARN("empty result set", K(ret)); ret = OB_ENTRY_NOT_EXIST; - } else if (OB_FAIL(cons_job_info(*result, job_info))) { - LOG_WARN("failed to construct job info", K(ret)); - } else {} + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "job_id", job_id, int64_t); + } + if (OB_SUCC(ret) && job_id < 1) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("find an invalid job", KR(ret), K(sql), K(job_id)); + } } } return ret; @@ -339,10 +368,16 @@ int ObRsJobTableOperator::update_job(int64_t job_id, share::ObDMLSqlSplicer &dml LOG_WARN("splice_insert_sql failed", K(ret)); } else if (OB_FAIL(trans.write(sql.ptr(), affected_rows))) { LOG_WARN("execute sql failed", K(sql), K(ret)); - } else if (!is_single_row(affected_rows)) { + } else if (OB_LIKELY(is_single_row(affected_rows))) { + // success + } else if (OB_UNLIKELY(is_zero_row(affected_rows))) { + ret = OB_EAGAIN; + LOG_WARN("[RS_JOB NOTICE] the specified rs job might has been already completed due to a new job" + "or deleted manually", KR(ret), K(affected_rows), K(sql)); + } else { ret = OB_ERR_UNEXPECTED; - LOG_WARN("insert succeeded but affected_rows is not one", K(ret), K(affected_rows)); - } else {} + LOG_WARN("update successfully but more than one row", KR(ret), K(affected_rows), K(sql)); + } } } return ret; @@ -375,6 +410,18 @@ int ObRsJobTableOperator::complete_job(int64_t job_id, int result_code, common:: } else if (OB_FAIL(pairs.add_column("job_status", job_status_str_array[JOB_STATUS_SUCCESS]))) { LOG_WARN("failed to add column", K(ret)); } + } else if (OB_SKIP_CHECKING_LS_STATUS == result_code) { + if (OB_FAIL(pairs.add_column("result_code", result_code))) { + LOG_WARN("failed to add column", K(ret)); + } else if (OB_FAIL(pairs.add_column("job_status", job_status_str_array[JOB_STATUS_SKIP_CHECKING_LS_STATUS]))) { + LOG_WARN("failed to add column", K(ret)); + } + } else if (OB_CANCELED == result_code) { + if (OB_FAIL(pairs.add_column("result_code", result_code))) { + LOG_WARN("failed to add column", K(ret)); + } else if (OB_FAIL(pairs.add_column("job_status", job_status_str_array[JOB_STATUS_CANCELED]))) { + LOG_WARN("failed to add column", K(ret)); + } } else { if (OB_FAIL(pairs.add_column("result_code", result_code))) { LOG_WARN("failed to add column", K(ret)); @@ -384,7 +431,10 @@ int ObRsJobTableOperator::complete_job(int64_t job_id, int result_code, common:: } if (OB_SUCC(ret)) { - if (OB_FAIL(update_job(job_id, pairs, trans))) { + if(OB_FAIL(pairs.get_extra_condition().assign_fmt("job_status='%s'", + job_status_str_array[JOB_STATUS_INPROGRESS]))) { + LOG_WARN("fail to assign extra condition", KR(ret)); + } else if (OB_FAIL(update_job(job_id, pairs, trans))) { LOG_WARN("failed to update job", K(ret), K(job_id)); } else { LOG_INFO("rootservice job completed", K(job_id), K(result_code)); @@ -456,42 +506,6 @@ int ObRsJobTableOperator::alloc_job_id(int64_t &job_id) return ret; } -int ObRsJobTableOperator::delete_rows_if_necessary() -{ - int ret = OB_SUCCESS; - if (ATOMIC_LOAD(&row_count_) > 2 * MAX_ROW_COUNT) { - ret = delete_rows(); - } - return ret; -} - -int ObRsJobTableOperator::delete_rows() -{ - int ret = OB_SUCCESS; - if (!inited_) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else { - ObLatchWGuard guard(latch_, ObLatchIds::DEFAULT_MUTEX); - common::ObSqlString sql; - int64_t row_count = ATOMIC_LOAD(&row_count_); - int64_t affected_rows = 0; - if (row_count <= MAX_ROW_COUNT) { - // do nothing - } else if (OB_FAIL(sql.assign_fmt("DELETE FROM %s ORDER BY job_id LIMIT %ld", - TABLE_NAME, row_count-MAX_ROW_COUNT))) { - LOG_WARN("failed to assign sql", K(ret)); - } else if (OB_FAIL(sql_client_->write(sql.ptr(), affected_rows))) { - LOG_WARN("execute sql failed", K(sql), K(ret)); - } else { - LOG_DEBUG("delete old rows from __all_rootservice_job", - K(affected_rows), K(row_count), "sql", sql.ptr()); - (void)ATOMIC_SAF(&row_count_, affected_rows); - } - } - return ret; -} - ObRsJobTableOperator &ObRsJobTableOperatorSingleton::get_instance() { static ObRsJobTableOperator the_one; diff --git a/src/rootserver/ob_rs_job_table_operator.h b/src/rootserver/ob_rs_job_table_operator.h index d48fc8a42..669ddee4b 100644 --- a/src/rootserver/ob_rs_job_table_operator.h +++ b/src/rootserver/ob_rs_job_table_operator.h @@ -73,10 +73,10 @@ enum ObRsJobType { JOB_TYPE_INVALID = 0, JOB_TYPE_ALTER_TENANT_LOCALITY, - JOB_TYPE_ROLLBACK_ALTER_TENANT_LOCALITY, + JOB_TYPE_ROLLBACK_ALTER_TENANT_LOCALITY, // deprecated in V4.2 JOB_TYPE_MIGRATE_UNIT, JOB_TYPE_DELETE_SERVER, - JOB_TYPE_SHRINK_RESOURCE_TENANT_UNIT_NUM, + JOB_TYPE_SHRINK_RESOURCE_TENANT_UNIT_NUM, // deprecated in V4.2 JOB_TYPE_RESTORE_TENANT, JOB_TYPE_UPGRADE_STORAGE_FORMAT_VERSION, JOB_TYPE_STOP_UPGRADE_STORAGE_FORMAT_VERSION, @@ -91,6 +91,8 @@ enum ObRsJobType JOB_TYPE_UPGRADE_INSPECTION, JOB_TYPE_UPGRADE_END, JOB_TYPE_UPGRADE_ALL, + JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM, + JOB_TYPE_ALTER_TENANT_PRIMARY_ZONE, JOB_TYPE_MAX }; @@ -100,6 +102,8 @@ enum ObRsJobStatus JOB_STATUS_INPROGRESS, JOB_STATUS_SUCCESS, JOB_STATUS_FAILED, + JOB_STATUS_SKIP_CHECKING_LS_STATUS, + JOB_STATUS_CANCELED, JOB_STATUS_MAX }; @@ -162,6 +166,7 @@ public: static const char* get_job_type_str(ObRsJobType job_type); static ObRsJobType get_job_type(const common::ObString &job_type_str); static ObRsJobStatus get_job_status(const common::ObString &job_status_str); + static bool is_valid_job_type(const ObRsJobType &rs_job_type); public: ObRsJobTableOperator(); virtual ~ObRsJobTableOperator() = default; @@ -173,7 +178,7 @@ public: // get job info by id int get_job(int64_t job_id, ObRsJobInfo &job_info); // find the one job with the search conditions - int find_job(share::ObDMLSqlSplicer &pairs, ObRsJobInfo &job_info, common::ObISQLClient &trans); + int find_job(const ObRsJobType job_type, share::ObDMLSqlSplicer &pairs, int64_t &job_id, common::ObISQLClient &trans); // update the job with the specified values int update_job(int64_t job_id, share::ObDMLSqlSplicer &pairs, common::ObISQLClient &trans); int update_job_progress(int64_t job_id, int64_t progress, common::ObISQLClient &trans); @@ -195,9 +200,6 @@ private: int alloc_job_id(int64_t &job_id); int load_max_job_id(int64_t &max_job_id, int64_t &row_count); int cons_job_info(const common::sqlclient::ObMySQLResult &res, ObRsJobInfo &job_info); - int load_row_count(int64_t &row_count); - int delete_rows_if_necessary(); - int delete_rows(); private: // data members bool inited_; @@ -219,22 +221,9 @@ public: #define THE_RS_JOB_TABLE ::oceanbase::rootserver::ObRsJobTableOperatorSingleton::get_instance() -// usage: int64_t job_id = RS_JOB_CREATE(ALTER_TENANT_LOCALITY, "tenant_id", 1010); +// usage: ret = RS_JOB_CREATE_WITH_RET(job_id, JOB_TYPE_ALTER_TENANT_LOCALITY, "tenant_id", 1010); // no need to fill the following column, all these columns are filled automatically: // job_type, job_status, gmt_create, gmt_modified, rs_svr_ip, rs_svr_port, progress(0) -#define RS_JOB_CREATE(job_type, trans, args...) \ - ({ \ - int64_t ret_job_id = -1; \ - int tmp_ret = ::oceanbase::common::OB_SUCCESS; \ - ::oceanbase::share::ObDMLSqlSplicer dml; \ - tmp_ret = ::oceanbase::rootserver::ob_build_dml_elements(dml, ##args); \ - if (::oceanbase::common::OB_SUCCESS == tmp_ret) { \ - THE_RS_JOB_TABLE.create_job(JOB_TYPE_ ## job_type, dml, ret_job_id, (trans)); \ - } \ - ret_job_id; \ - }) - -// just like RS_JOB_CREATE, macro with return code #define RS_JOB_CREATE_WITH_RET(job_id, job_type, trans, args...) \ ({ \ job_id = ::oceanbase::common::OB_INVALID_ID; \ @@ -278,7 +267,10 @@ public: // job finished: // 1. result_code == 0, update status(SUCCESS) and progress(100) automatically -// 2. result_code != 0, update status(FAILED) automatically +// 2. result_code == -4762, update status(SKIP_CHECKING_LS_STATUS), +// the job is finished without checking whether ls are balanced +// 3. result_code == -4072, update status(CANCELED) the job is cancelled by a new job +// 4. else, update status(FAILED) automatically #define RS_JOB_COMPLETE(job_id, result_code, trans) \ THE_RS_JOB_TABLE.complete_job((job_id), (result_code), (trans)) @@ -286,14 +278,14 @@ public: #define RS_JOB_GET(job_id, job_info) \ THE_RS_JOB_TABLE.get_job((job_id), (job_info)) -// usage: ret = RS_JOB_FIND(job_info, "job_status", "INPROGRESS", "job_type", "ALTER_TENANT_LOCALITY", "tenant_id", 1010); -#define RS_JOB_FIND(job_info, trans, args...) \ +// usage: ret = RS_JOB_FIND(ALTER_TENANT_LOCALITY, job_id, "tenant_id", 1010); +#define RS_JOB_FIND(job_type, job_id, trans, args...) \ ({ \ int tmp_ret = ::oceanbase::common::OB_SUCCESS; \ ::oceanbase::share::ObDMLSqlSplicer dml; \ tmp_ret = ::oceanbase::rootserver::ob_build_dml_elements(dml, ##args); \ if (::oceanbase::common::OB_SUCCESS == tmp_ret) { \ - tmp_ret = THE_RS_JOB_TABLE.find_job(dml, job_info, (trans)); \ + tmp_ret = THE_RS_JOB_TABLE.find_job(JOB_TYPE_ ## job_type, dml, job_id, (trans)); \ } \ tmp_ret; \ }) diff --git a/src/rootserver/ob_server_manager.cpp b/src/rootserver/ob_server_manager.cpp index 22006e0db..6a0a9da48 100644 --- a/src/rootserver/ob_server_manager.cpp +++ b/src/rootserver/ob_server_manager.cpp @@ -255,22 +255,24 @@ int ObServerManager::delete_server(const ObIArray &servers, const ObZone new_server_status = *server_status; } } - if (OB_FAIL(ret)) { - } else { + if (OB_SUCC(ret)) { new_server_status.admin_status_ = ObServerStatus::OB_SERVER_ADMIN_DELETING; ROOTSERVICE_EVENT_ADD("server", "delete_server", K(server)); LOG_INFO("delete server, server change status to deleting", K(server), K(zone)); - char ip_buf[common::MAX_IP_ADDR_LENGTH]; + char ip_buf[common::MAX_IP_ADDR_LENGTH]; (void)server.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - int64_t job_id = RS_JOB_CREATE(DELETE_SERVER, trans, - "svr_ip", ip_buf, "svr_port", server.get_port()); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed ", K(ret)); + int64_t job_id = 0; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET( + job_id, + JOB_TYPE_DELETE_SERVER, + trans, + "svr_ip", ip_buf, + "svr_port", server.get_port()))) { + LOG_WARN("fail to create rs job DELETE_SERVER", KR(ret)); } else if (OB_FAIL(st_operator_.update_status(server, new_server_status.get_display_status(), new_server_status.last_hb_time_, trans))) { - LOG_WARN("st_operator update_status failed", K(server), K(new_server_status), K(ret)); + LOG_WARN("st_operator update_status failed", K(server), K(new_server_status), K(ret)); } else { const ObServerStatus::ServerAdminStatus status = ObServerStatus::OB_SERVER_ADMIN_DELETING; const bool remove = false; @@ -362,17 +364,16 @@ int ObServerManager::end_delete_server(const ObAddr &server, const ObZone &zone, // complete the job char ip_buf[common::MAX_IP_ADDR_LENGTH]; (void)server.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - ObRsJobInfo job_info; - ret = RS_JOB_FIND(job_info, trans, "job_type", "DELETE_SERVER", - "job_status", "INPROGRESS", + int64_t job_id = 0; + ret = RS_JOB_FIND(DELETE_SERVER, job_id, trans, "svr_ip", ip_buf, "svr_port", server.get_port()); - if (OB_SUCC(ret) && job_info.job_id_ > 0) { + if (OB_SUCC(ret) && job_id > 0) { int tmp_ret = commit ? OB_SUCCESS : OB_CANCELED; - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, tmp_ret, trans))) { + if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) { LOG_WARN("all_rootservice_job update failed" , K(ret), K(server)); } } else { - LOG_WARN("failed to find job", K(ret), K(server)); + LOG_WARN("failed to find job", K(ret), K(server), K(job_id)); } } if (OB_SUCC(ret)) { diff --git a/src/rootserver/ob_server_zone_op_service.cpp b/src/rootserver/ob_server_zone_op_service.cpp index 65d6eece6..4377e634e 100644 --- a/src/rootserver/ob_server_zone_op_service.cpp +++ b/src/rootserver/ob_server_zone_op_service.cpp @@ -520,6 +520,7 @@ int ObServerZoneOpService::delete_server_( const int64_t now = ObTimeUtility::current_time(); char ip[OB_MAX_SERVER_ADDR_SIZE] = ""; ObMySQLTransaction trans; + int64_t job_id = 0; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K(is_inited_)); @@ -542,18 +543,19 @@ int ObServerZoneOpService::delete_server_( } else if (OB_UNLIKELY(server_info_in_table.is_deleting())) { ret = OB_SERVER_ALREADY_DELETED; LOG_WARN("the server has been deleted", KR(ret), K(server_info_in_table)); - } else { - int64_t job_id = RS_JOB_CREATE(DELETE_SERVER, trans, "svr_ip", ip, "svr_port", server.get_port()); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed ", K(ret)); - } else if (OB_FAIL(ObServerTableOperator::update_status( - trans, - server, - server_info_in_table.get_status(), - ObServerStatus::OB_SERVER_DELETING))) { - LOG_WARN("fail to update status", KR(ret), K(server), K(server_info_in_table)); - } + } else if (OB_FAIL(RS_JOB_CREATE_WITH_RET( + job_id, + JOB_TYPE_DELETE_SERVER, + trans, + "svr_ip", ip, + "svr_port", server.get_port()))) { + LOG_WARN("fail to create rs job DELETE_SERVER", KR(ret)); + } else if (OB_FAIL(ObServerTableOperator::update_status( + trans, + server, + server_info_in_table.get_status(), + ObServerStatus::OB_SERVER_DELETING))) { + LOG_WARN("fail to update status", KR(ret), K(server), K(server_info_in_table)); } (void) end_trans_and_on_server_change_(ret, trans, "delete_server", server, server_info_in_table.get_zone(), now); return ret; @@ -586,13 +588,12 @@ int ObServerZoneOpService::check_and_end_delete_server_( LOG_ERROR("server is not in deleting status, cannot be removed from __all_server table", KR(ret), K(server_info)); } else { - ObRsJobInfo job_info; - ret = RS_JOB_FIND(job_info, trans, "job_type", "DELETE_SERVER", - "job_status", "INPROGRESS", + int64_t job_id = 0; + ret = RS_JOB_FIND(DELETE_SERVER, job_id, trans, "svr_ip", ip, "svr_port", server.get_port()); - if (OB_SUCC(ret) && job_info.job_id_ > 0) { + if (OB_SUCC(ret) && job_id > 0) { int tmp_ret = is_cancel ? OB_CANCELED : OB_SUCCESS; - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, tmp_ret, trans))) { + if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) { LOG_WARN("fail to all_rootservice_job" , KR(ret), K(server)); } } else { diff --git a/src/rootserver/ob_shrink_resource_pool_checker.cpp b/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp similarity index 60% rename from src/rootserver/ob_shrink_resource_pool_checker.cpp rename to src/rootserver/ob_shrink_expand_resource_pool_checker.cpp index f4a9e4f07..cece74efc 100644 --- a/src/rootserver/ob_shrink_resource_pool_checker.cpp +++ b/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp @@ -11,7 +11,7 @@ */ #define USING_LOG_PREFIX RS -#include "ob_shrink_resource_pool_checker.h" +#include "ob_shrink_expand_resource_pool_checker.h" #include "lib/container/ob_array.h" #include "ob_unit_manager.h" #include "ob_root_utils.h"//get_tenant_id @@ -29,20 +29,20 @@ using namespace share::schema; namespace rootserver { -int ObShrinkResourcePoolChecker::check_stop() const +int ObShrinkExpandResourcePoolChecker::check_stop() const { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret), K(is_inited_)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret), K(is_inited_)); } else if (is_stop_) { ret = OB_CANCELED; - LOG_WARN("ObShrinkResourcePoolChecker stopped", KR(ret), K(is_stop_)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stopped", KR(ret), K(is_stop_)); } else {} // do nothing return ret; } -int ObShrinkResourcePoolChecker::init( +int ObShrinkExpandResourcePoolChecker::init( share::schema::ObMultiVersionSchemaService *schema_service, rootserver::ObUnitManager *unit_mgr, share::ObLSTableOperator &lst_operator, @@ -51,7 +51,7 @@ int ObShrinkResourcePoolChecker::init( int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret), K(is_inited_)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret), K(is_inited_)); } else if (OB_ISNULL(schema_service) || OB_ISNULL(unit_mgr)) { ret = OB_INVALID_ARGUMENT; @@ -66,16 +66,16 @@ int ObShrinkResourcePoolChecker::init( return ret; } -int ObShrinkResourcePoolChecker::check() +int ObShrinkExpandResourcePoolChecker::check() { int ret = OB_SUCCESS; LOG_INFO("start check shrink resource pool"); ObArray tenant_ids; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret)); } else if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkResourcePoolChecker stop", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); } else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) { LOG_WARN("fail to get tenant id array", KR(ret)); } else { @@ -89,6 +89,7 @@ int ObShrinkResourcePoolChecker::check() } else if (is_meta_tenant(tenant_id)) { //nothing TODO } else { + ObCurTraceId::init(GCONF.self_addr_); LOG_INFO("start check shrink resource pool", K(tenant_id)); if (OB_TMP_FAIL(check_shrink_resource_pool_finished_by_tenant_(tenant_id))) { LOG_WARN("fail to check shrink resource pool finish", KR(ret), KR(tmp_ret), K(tenant_id)); @@ -100,21 +101,22 @@ int ObShrinkResourcePoolChecker::check() return ret; } -int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_tenant_( +int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_tenant_( const uint64_t tenant_id) { int ret = OB_SUCCESS; ObArray pool_ids; bool in_shrinking = true; bool is_finished = true; + int check_ret = OB_NEED_WAIT; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret)); } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkResourcePoolChecker stop", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ptr is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_)); @@ -126,15 +128,21 @@ int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_tenant_( } else if (OB_FAIL(unit_mgr_->check_pool_in_shrinking(pool_ids.at(0), in_shrinking))) { LOG_WARN("failed to check resource pool in shrink", KR(ret), K(pool_ids)); } else if (!in_shrinking) { - //nothing todo + // check if there exists expand task + // if exists, then check whether this task can be committed and commit it if ls is balanced + // if not exists, return OB_SUCCESS + if (OB_FAIL(check_and_commit_expand_resource_pool_(tenant_id))) { + LOG_WARN("fail to execute check_and_commit_expand_resource_pool_", KR(ret), K(tenant_id)); + } } else { //check shrink finish //get all unit and server - //check ls not in the unit and ls_meta not in the server ObArray units; ObArray servers; ObArray unit_ids; ObArray unit_group_ids; + int tmp_ret = OB_SUCCESS; + int64_t job_id = 0; for (int64_t i = 0; OB_SUCC(ret) && i < pool_ids.count(); ++i) { units.reset(); if (OB_FAIL(unit_mgr_->get_deleting_units_of_pool(pool_ids.at(i), units))) { @@ -143,21 +151,37 @@ int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_tenant_( LOG_WARN("failed to extract units server and ids", KR(ret), K(units)); } }//end for get all unit group, units, server + // find the corresponding rs job at first, then check if we can complete it + // if we only find the rs job at the committing period, + // we do not know whether the job has been changed during checking process + // e.g. job 1 is the rs job before checking, + // right after checking, job 2 is created and job 1 is canceled by job 2, + // then committing process will find job 2 and complete job 2 immediately, + // which means, job 2 is completed without checking. + if (OB_SUCC(ret) && OB_TMP_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, *sql_proxy_))) { + // even if there is no corresponding rs job, we should do complete job (e.g. remove deleting unit in __all_unit table) + if (OB_ENTRY_NOT_EXIST == tmp_ret) { + FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] there exists unit_num changing without corresponding rs job", + KR(ret), K(tmp_ret), K(tenant_id)); + } else { + LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), KR(tmp_ret), K(tenant_id)); + } + } if (FAILEDx(check_shrink_resource_pool_finished_by_ls_(tenant_id, - servers, unit_ids, unit_group_ids, is_finished))) { + servers, unit_ids, unit_group_ids, is_finished, check_ret))) { LOG_WARN("failed to check shrink by ls", KR(ret), K(servers), K(unit_ids), K(unit_group_ids)); } if (OB_SUCC(ret) && is_finished) { - //commit finish of the tenant - if (OB_FAIL(commit_tenant_shrink_resource_pool_(tenant_id))) { - LOG_WARN("failed to shrink tenant resource pool", KR(ret), K(tenant_id)); + // commit finish of the tenant + if (OB_FAIL(commit_tenant_shrink_resource_pool_(tenant_id, job_id, check_ret))) { // shrink + LOG_WARN("failed to shrink tenant resource pool", KR(ret), K(tenant_id), K(job_id), K(check_ret)); } } } return ret; } -int ObShrinkResourcePoolChecker::extract_units_servers_and_ids_( +int ObShrinkExpandResourcePoolChecker::extract_units_servers_and_ids_( const ObIArray &units, ObIArray &servers, ObIArray &unit_ids, @@ -182,29 +206,37 @@ int ObShrinkResourcePoolChecker::extract_units_servers_and_ids_( return ret; } -int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_ls_( +int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_ls_( const uint64_t tenant_id, const ObIArray &servers, const ObIArray &unit_ids, const ObIArray &unit_group_ids, - bool &is_finished) + bool &is_finished, + int &check_ret) { int ret = OB_SUCCESS; is_finished = true; + check_ret = OB_NEED_WAIT; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret)); } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || 0 == servers.count() || 0 == unit_ids.count() || 0 == unit_group_ids.count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(servers), K(unit_ids), K(unit_group_ids)); } else if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkResourcePoolChecker stop", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_) || OB_ISNULL(lst_operator_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ptr is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_), KP(lst_operator_)); + } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); + } else if (OB_NEED_WAIT == check_ret) { + is_finished = false; } else { + // check ls meta table for shrinking + // to make sure that all the ls in deleting units have been migrated to some other normal units ObLSStatusInfoArray ls_status_array; ObLSStatusOperator ls_status_op; if (OB_FAIL(ls_status_op.get_all_tenant_related_ls_status_info( @@ -216,7 +248,7 @@ int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_ls_( share::ObLSInfo ls_info; int64_t ls_replica_cnt = 0; if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkResourcePoolChecker stopped", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stopped", KR(ret)); } else if (has_exist_in_array(unit_group_ids, ls_status_info.unit_group_id_)) { is_finished = false; LOG_INFO("has ls in the unit group", KR(ret), K(ls_status_info)); @@ -244,25 +276,75 @@ int ObShrinkResourcePoolChecker::check_shrink_resource_pool_finished_by_ls_( return ret; } -int ObShrinkResourcePoolChecker::commit_tenant_shrink_resource_pool_( - const uint64_t tenant_id) +int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_( + const uint64_t tenant_id, + const int64_t job_id, + const int check_ret) { int ret = OB_SUCCESS; + DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("ObShrinkResourcePoolChecker not init", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker not init", KR(ret)); } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkResourcePoolChecker stop", KR(ret)); + LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); } else if (OB_ISNULL(unit_mgr_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ptr is null", KR(ret), KP(unit_mgr_)); - } else if (OB_FAIL(unit_mgr_->commit_shrink_tenant_resource_pool(tenant_id))) { - LOG_WARN("fail to shrink resource pool", KR(ret), K(tenant_id)); + } else if (OB_FAIL(unit_mgr_->commit_shrink_tenant_resource_pool(tenant_id, job_id, check_ret))) { + LOG_WARN("fail to shrink resource pool", KR(ret), K(tenant_id), K(job_id), K(check_ret)); } else {} // no more to do return ret; } + +int ObShrinkExpandResourcePoolChecker::check_and_commit_expand_resource_pool_(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + int64_t check_job_id = 0; + int check_ret = OB_NEED_WAIT; + if (OB_ISNULL(sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_proxy_ is null", KR(ret), KP(sql_proxy_)); + } else if (OB_FAIL(RS_JOB_FIND( + ALTER_RESOURCE_TENANT_UNIT_NUM, + check_job_id, + *sql_proxy_, + "tenant_id", tenant_id))) { + // find the corresponding rs job at first, then check if we can complete it + // if we only find the rs job at the committing period, + // we do not know whether the job has been changed during checking process + // e.g. job 1 is the rs job before checking, + // right after checking, job 2 is created and job 1 is canceled by job 2, + // then committing process will find job 2 and complete job 2 immediately, + // which means, job 2 is completed without checking. + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job", KR(ret), K(tenant_id)); + } + } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); + } else if (OB_NEED_WAIT != check_ret) { + DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM); + if (OB_FAIL(check_stop())) { + LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); + } else if (OB_SUCC(RS_JOB_COMPLETE(check_job_id, check_ret, *sql_proxy_))) { + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job EXPAND UNIT_NUM", + KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); + } else { + LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); + if (OB_EAGAIN == ret) { + FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job EXPAND UNIT_NUM might has " + "been already completed due to a new job or deleted in table manually", + KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); + ret = OB_SUCCESS; // no need to return the error code + } + } + } + return ret; +} } // end namespace rootserver } // end oceanbase diff --git a/src/rootserver/ob_shrink_resource_pool_checker.h b/src/rootserver/ob_shrink_expand_resource_pool_checker.h similarity index 80% rename from src/rootserver/ob_shrink_resource_pool_checker.h rename to src/rootserver/ob_shrink_expand_resource_pool_checker.h index b08f74fc8..c7817a832 100644 --- a/src/rootserver/ob_shrink_resource_pool_checker.h +++ b/src/rootserver/ob_shrink_expand_resource_pool_checker.h @@ -10,8 +10,8 @@ * See the Mulan PubL v2 for more details. */ -#ifndef OCEANBASE_ROOTSERVER_OB_SHRINK_RESOURCE_POOL_CHECKER_ -#define OCEANBASE_ROOTSERVER_OB_SHRINK_RESOURCE_POOL_CHECKER_ +#ifndef OCEANBASE_ROOTSERVER_OB_SHRINK_EXPAND_RESOURCE_POOL_CHECKER_ +#define OCEANBASE_ROOTSERVER_OB_SHRINK_EXPAND_RESOURCE_POOL_CHECKER_ #include "share/ob_define.h" #include "ob_root_utils.h" @@ -39,17 +39,17 @@ class DRLSInfo; class ObUnitManager; class ObServerManager; class ObZoneManager; -class ObShrinkResourcePoolChecker : public share::ObCheckStopProvider +class ObShrinkExpandResourcePoolChecker : public share::ObCheckStopProvider { public: - ObShrinkResourcePoolChecker(volatile bool &is_stop) + ObShrinkExpandResourcePoolChecker(volatile bool &is_stop) : is_stop_(is_stop), sql_proxy_(NULL), unit_mgr_(NULL), schema_service_(NULL), lst_operator_(NULL), is_inited_(false) {} - virtual ~ObShrinkResourcePoolChecker() {} + virtual ~ObShrinkExpandResourcePoolChecker() {} public: int init( share::schema::ObMultiVersionSchemaService *schema_service, @@ -72,9 +72,13 @@ private: const ObIArray &servers, const ObIArray &unit_ids, const ObIArray &unit_group_ids, - bool &is_finished); + bool &is_finished, + int &check_ret); int commit_tenant_shrink_resource_pool_( - const uint64_t tenant_id); + const uint64_t tenant_id, + const int64_t job_id, + const int check_ret); + int check_and_commit_expand_resource_pool_(const uint64_t tenant_id); private: const volatile bool &is_stop_; common::ObMySQLProxy *sql_proxy_; diff --git a/src/rootserver/ob_tenant_balance_service.cpp b/src/rootserver/ob_tenant_balance_service.cpp index 0ada3c52a..4759f4495 100755 --- a/src/rootserver/ob_tenant_balance_service.cpp +++ b/src/rootserver/ob_tenant_balance_service.cpp @@ -100,8 +100,8 @@ void ObTenantBalanceService::do_work() //check ls status is match with __all_ls //TODO if (ObShareUtil::is_tenant_enable_transfer(tenant_id_)) { - if (OB_FAIL(gather_ls_status_stat_())) { - LOG_WARN("failed to gather ls status", KR(ret)); + if (OB_FAIL(gather_ls_status_stat(tenant_id_, ls_array_))) { + LOG_WARN("failed to gather ls status", KR(ret), K(tenant_id_), K(ls_array_)); } else if (OB_FAIL(ls_balance_(job_cnt))) { LOG_WARN("failed to do ls balance", KR(ret)); } else if (0 == job_cnt) { @@ -114,8 +114,10 @@ void ObTenantBalanceService::do_work() if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema_copy))) { LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_)); } else { + bool is_balanced = false; + bool need_execute_balance = true; ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema_copy, tenant_id_); - if (OB_FAIL(ObLSServiceHelper::balance_ls_group(tenant_info))) { + if (OB_FAIL(ObLSServiceHelper::balance_ls_group(need_execute_balance, tenant_info, is_balanced))) { LOG_WARN("failed to balance ls group", KR(ret)); } } @@ -151,6 +153,40 @@ void ObTenantBalanceService::do_work() } } +int ObTenantBalanceService::gather_stat_primary_zone_num_and_units( + const uint64_t &tenant_id, + int64_t &primary_zone_num, + ObIArray &unit_group_array) +{ + int ret = OB_SUCCESS; + unit_group_array.reset(); + primary_zone_num = 0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_) || OB_ISNULL(GCTX.schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_), KP(GCTX.schema_service_)); + } else { + //get primary zone + share::schema::ObTenantSchema tenant_schema; + ObArray primary_zone; + if (OB_FAIL(get_tenant_schema(tenant_id, tenant_schema))) { + LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id)); + } else if (!tenant_schema.is_normal()) { + //already wait tenant ready, must be normal + ret = OB_ERR_UNEXPECTED; + WSTAT("tenant schema not ready is unexpected", KR(ret)); + } else if (OB_FAIL(ObLSServiceHelper::get_primary_zone_unit_array(&tenant_schema, + primary_zone, unit_group_array))) { + LOG_WARN("failed to get primary zone unit array", KR(ret), K(tenant_schema)); + } else { + primary_zone_num = primary_zone.count(); + } + } + return ret; +} + int ObTenantBalanceService::gather_stat_() { int ret = OB_SUCCESS; @@ -158,37 +194,24 @@ int ObTenantBalanceService::gather_stat_() if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret)); - } else if (OB_ISNULL(GCTX.sql_proxy_) - || OB_ISNULL(GCTX.schema_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_), KP(GCTX.schema_service_)); + } else if (OB_FAIL(gather_stat_primary_zone_num_and_units( + tenant_id_, + primary_zone_num_, + unit_group_array_))) { + LOG_WARN("fail to execute gather_stat_primary_zone_num_and_units", KR(ret), K(tenant_id_)); } else { - //get primary zone - share::schema::ObTenantSchema tenant_schema; - ObArray primary_zone; - if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) { - LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_)); - } else if (!tenant_schema.is_normal()) { - //already wait tenant ready, must be normal - ret = OB_ERR_UNEXPECTED; - WSTAT("tenant schema not ready is unexpected", KR(ret)); - } else if (OB_FAIL(ObLSServiceHelper::get_primary_zone_unit_array(&tenant_schema, - primary_zone, unit_group_array_))) { - LOG_WARN("failed to get primary zone unit array", KR(ret), K(tenant_schema)); - } else { - primary_zone_num_ = primary_zone.count(); - ATOMIC_SET(&loaded_, true); - } + ATOMIC_SET(&loaded_, true); } return ret; } -int ObTenantBalanceService::gather_ls_status_stat_() +int ObTenantBalanceService::gather_ls_status_stat(const uint64_t &tenant_id, share::ObLSStatusInfoArray &ls_array) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); + ls_array.reset(); + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); } else if (OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_)); @@ -196,25 +219,25 @@ int ObTenantBalanceService::gather_ls_status_stat_() //get ls status info //must remove ls group id = 0, those ls no need balance, such as sys ls and duplicate ls ObLSStatusOperator status_op; - ObLSAttrOperator ls_op(tenant_id_, GCTX.sql_proxy_); + ObLSAttrOperator ls_op(tenant_id, GCTX.sql_proxy_); share::ObLSAttrArray ls_attr_array; - if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id_, ls_array_, *GCTX.sql_proxy_))) { - LOG_WARN("failed to get status by order", KR(ret), K(tenant_id_)); + if (OB_FAIL(status_op.get_all_ls_status_by_order(tenant_id, ls_array, *GCTX.sql_proxy_))) { + LOG_WARN("failed to get status by order", KR(ret), K(tenant_id)); } else if (OB_FAIL(ls_op.get_all_ls_by_order(ls_attr_array))) { LOG_WARN("failed to get ls attr array", KR(ret)); - } else if (ls_attr_array.count() > ls_array_.count()) { + } else if (ls_attr_array.count() > ls_array.count()) { //only ls status has more ls, such as some ls is waitoffline ret = OB_NEED_WAIT; WSTAT("has ls need create", KR(ret), K(ls_attr_array)); } int64_t attr_index = ls_attr_array.count() - 1; bool need_remove_ls = false; - for (int64_t i = ls_array_.count() - 1; OB_SUCC(ret) && i >= 0; --i) { + for (int64_t i = ls_array.count() - 1; OB_SUCC(ret) && i >= 0; --i) { if (attr_index < 0) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls attr array is unexpected", KR(ret), K(i), K(ls_attr_array)); } else { - const ObLSStatusInfo &status_info = ls_array_.at(i); + const ObLSStatusInfo &status_info = ls_array.at(i); const ObLSAttr &ls_info = ls_attr_array.at(attr_index); need_remove_ls = false; if (status_info.ls_id_ == ls_info.get_ls_id()) { @@ -241,7 +264,7 @@ int ObTenantBalanceService::gather_ls_status_stat_() if (!status_info.ls_is_wait_offline()) { ret = OB_ERR_UNEXPECTED; WSTAT("ls status not expected", KR(ret), K(status_info), K(ls_info), - K(ls_array_), K(ls_attr_array)); + K(ls_array), K(ls_attr_array)); } } else { // ls in status can not large than in __all_ls by order @@ -250,8 +273,8 @@ int ObTenantBalanceService::gather_ls_status_stat_() K(status_info)); } if (OB_SUCC(ret) && need_remove_ls) { - ISTAT("LS no need balance", "ls_status", ls_array_.at(i)); - if (OB_FAIL(ls_array_.remove(i))) { + ISTAT("LS no need balance", "ls_status", ls_array.at(i)); + if (OB_FAIL(ls_array.remove(i))) { LOG_WARN("failed to remvoe no ls group ls", KR(ret), K(i)); } } @@ -261,6 +284,108 @@ int ObTenantBalanceService::gather_ls_status_stat_() return ret; } +int ObTenantBalanceService::is_ls_balance_finished(const uint64_t &tenant_id, bool &is_finished) +{ + int ret = OB_SUCCESS; + bool is_primary = true; + is_finished = false; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant or not user tenant", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (ObAllTenantInfoProxy::is_primary_tenant(GCTX.sql_proxy_, tenant_id, is_primary)) { + LOG_WARN("fail to execute is_primary_tenant", KR(ret), K(tenant_id)); + } else if (is_primary) { + if (OB_FAIL(is_primary_tenant_ls_balance_finished_(tenant_id, is_finished))) { + LOG_WARN("fail to execute is_primary_tenant_ls_balance_finished_", KR(ret), K(tenant_id)); + } + } else { + // standby & restore + if (OB_FAIL(is_standby_tenant_ls_balance_finished_(tenant_id, is_finished))) { + LOG_WARN("fail to execute is_standby_tenant_ls_balance_finished_", KR(ret), K(tenant_id)); + } + } + LOG_TRACE("check whether the tenant has balanced ls", K(ret), K(tenant_id), K(is_primary), K(is_finished)); + return ret; +} + +int ObTenantBalanceService::is_primary_tenant_ls_balance_finished_( + const uint64_t &tenant_id, + bool &is_finished) +{ + int ret = OB_SUCCESS; + int64_t job_cnt = 1; + int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; + ObBalanceJob job; + ObLSBalanceTaskHelper ls_balance_helper; + bool need_ls_balance = false; + int64_t primary_zone_num = 0; + share::ObLSStatusInfoArray ls_array; + ObArray unit_group_array; + is_finished = false; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant or not user tenant", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( + tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + job_cnt = 0; + } else { + LOG_WARN("fail to get balance job", KR(ret), K(tenant_id)); + } + } + if (OB_FAIL(ret)) { + } else if (0 != job_cnt) { + is_finished= false; + } else if (OB_FAIL(gather_ls_status_stat(tenant_id, ls_array))) { + LOG_WARN("fail to execute gather_ls_status_stat", KR(ret), K(tenant_id)); + } else if (OB_FAIL(gather_stat_primary_zone_num_and_units(tenant_id, primary_zone_num, unit_group_array))) { + LOG_WARN("fail to execute gather_stat_primary_zone_num_and_units", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ls_balance_helper.init( + tenant_id, ls_array, unit_group_array, primary_zone_num, GCTX.sql_proxy_))) { + LOG_WARN("failed to init ls balance helper", KR(ret), K(ls_array), K(unit_group_array), + K(primary_zone_num), K(tenant_id)); + } else if (OB_FAIL(ls_balance_helper.check_need_ls_balance(need_ls_balance))) { + LOG_WARN("failed to check_ls need balance", KR(ret)); + } else { + is_finished = !need_ls_balance; + } + LOG_INFO("check whether the primary_tenant has balanced ls", KR(ret), K(tenant_id), K(ls_array), + K(primary_zone_num), K(unit_group_array), K(need_ls_balance)); + return ret; +} + +int ObTenantBalanceService::is_standby_tenant_ls_balance_finished_( + const uint64_t &tenant_id, + bool &is_finished) +{ + int ret = OB_SUCCESS; + ObTenantSchema tenant_schema; + is_finished = false; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant or not user tenant", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObTenantThreadHelper::get_tenant_schema(tenant_id, tenant_schema))) { + LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id)); + } else { + ObTenantLSInfo tenant_info(GCTX.sql_proxy_, &tenant_schema, tenant_id); + bool need_execute_balance = false; + if (OB_FAIL(ObLSServiceHelper::balance_ls_group(need_execute_balance, tenant_info, is_finished))) { + LOG_WARN("failed to balance ls group", KR(ret), K(tenant_info)); + } + LOG_INFO("check whether the non_primary_tenant has balanced ls", KR(ret), K(tenant_id), K(tenant_info)); + } + return ret; +} int ObTenantBalanceService::try_process_current_job(int64_t &job_cnt) { diff --git a/src/rootserver/ob_tenant_balance_service.h b/src/rootserver/ob_tenant_balance_service.h index a1eb7192c..fa211bf76 100644 --- a/src/rootserver/ob_tenant_balance_service.h +++ b/src/rootserver/ob_tenant_balance_service.h @@ -74,12 +74,18 @@ public: UNUSED(lsn); return OB_SUCCESS; } + static int gather_stat_primary_zone_num_and_units( + const uint64_t &tenant_id, + int64_t &primary_zone_num, + ObIArray &unit_group_array); + static int gather_ls_status_stat(const uint64_t &tenant_id, share::ObLSStatusInfoArray &ls_array); + static int is_ls_balance_finished(const uint64_t &tenant_id, bool &is_finished); private: + static int is_primary_tenant_ls_balance_finished_(const uint64_t &tenant_id, bool &is_finished); + static int is_standby_tenant_ls_balance_finished_(const uint64_t &tenant_id, bool &is_finished); //load current unit group and primary zone int gather_stat_(); - //load __all_ls_status - int gather_ls_status_stat_(); //process current job int try_process_current_job(int64_t &job_cnt); //accordint to primary_zone and unit group diff --git a/src/rootserver/ob_unit_manager.cpp b/src/rootserver/ob_unit_manager.cpp index bb8bc154a..3757c59e8 100644 --- a/src/rootserver/ob_unit_manager.cpp +++ b/src/rootserver/ob_unit_manager.cpp @@ -1468,6 +1468,67 @@ int ObUnitManager::determine_alter_resource_tenant_unit_num_type( return ret; } +int ObUnitManager::register_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + const int64_t new_unit_num, + const AlterUnitNumType alter_unit_num_type, + common::ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + uint64_t tenant_data_version = 0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) + || !(AUN_SHRINK == alter_unit_num_type || AUN_EXPAND == alter_unit_num_type))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(alter_unit_num_type)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_WARN("fail to get min data version", KR(ret), K(tenant_id)); + } else if (tenant_data_version < DATA_VERSION_4_2_1_0) { + if (AUN_EXPAND == alter_unit_num_type) { + // skip, no rs job for expand task in version 4.1 + // we do not need to rollback rs job, since in 4.1, there is no inprogress rs job at this step + } else if (OB_FAIL(register_shrink_tenant_pool_unit_num_rs_job(tenant_id, new_unit_num, trans))) { + LOG_WARN("fail to execute register_shrink_tenant_pool_unit_num_rs_job", KR(ret), + K(tenant_id), K(new_unit_num)); + } + } else { // tenant_data_version >= DATA_VERSION_4_2_1_0 + // step 1: cancel rs job if exists + int64_t job_id = 0; + if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id)); + (void) print_user_error_(tenant_id); + } else if(OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) { + LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id)); + } else { + ret = create_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, job_id, trans); + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] create a new rs job", "type", + AUN_EXPAND == alter_unit_num_type ? "EXPAND UNIT_NUM" : "SHRINK UNIT_NUM", + KR(ret), K(tenant_id), K(job_id), K(alter_unit_num_type)); + } + } + return ret; +} + +int ObUnitManager::find_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + int64_t &job_id, + common::ObISQLClient &sql_proxy) +{ + int ret = OB_SUCCESS; + ret = RS_JOB_FIND( + ALTER_RESOURCE_TENANT_UNIT_NUM, + job_id, + sql_proxy, + "tenant_id", tenant_id); + if (OB_ENTRY_NOT_EXIST == ret) { + ret = RS_JOB_FIND( + SHRINK_RESOURCE_TENANT_UNIT_NUM, // only created in version < 4.2 + job_id, + sql_proxy, + "tenant_id", tenant_id); + } + return ret; +} int ObUnitManager::register_shrink_tenant_pool_unit_num_rs_job( const uint64_t tenant_id, const int64_t new_unit_num, @@ -1475,44 +1536,136 @@ int ObUnitManager::register_shrink_tenant_pool_unit_num_rs_job( { int ret = OB_SUCCESS; int64_t pos = 0; - const int64_t extra_info_len = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH; - char extra_info[common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH] = {0}; - if (OB_SUCCESS != (ret = databuff_printf(extra_info, extra_info_len, pos, - "new_unit_num: %ld", new_unit_num))) { - if (OB_SIZE_OVERFLOW == ret) { - LOG_WARN("format to buff size overflow", K(ret)); - } else { - LOG_WARN("format new unit num failed", K(ret)); - } - } - if (OB_SUCC(ret)) { - int64_t job_id = RS_JOB_CREATE(SHRINK_RESOURCE_TENANT_UNIT_NUM, trans, - "tenant_id", tenant_id, - "extra_info", extra_info); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed", K(ret)); - } + int64_t job_id = 0; + if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id)); + (void) print_user_error_(tenant_id); + } else { + ret = create_alter_resource_tenant_unit_num_rs_job( + tenant_id, + new_unit_num, + job_id, + trans, + JOB_TYPE_SHRINK_RESOURCE_TENANT_UNIT_NUM); + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] create a new rs job in Version < 4.2", + KR(ret), K(tenant_id), K(job_id)); } return ret ; } -int ObUnitManager::rollback_shrink_tenant_pool_unit_num_rs_job( - const uint64_t tenant_id, - common::ObMySQLTransaction &trans) +void ObUnitManager::print_user_error_(const uint64_t tenant_id) { - ObRsJobInfo job_info; - int ret = RS_JOB_FIND(job_info, trans, - "job_type", "SHRINK_RESOURCE_TENANT_UNIT_NUM", - "job_status", "INPROGRESS", - "tenant_id", tenant_id); - if (OB_SUCC(ret) && job_info.job_id_ > 0) { - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, -1, trans))) { // Roll back, this shrink failed - LOG_WARN("update all_rootservice_job failed", K(ret), K(job_info)); + const int64_t ERR_MSG_LEN = 256; + char err_msg[ERR_MSG_LEN]; + int ret = OB_SUCCESS; + int64_t pos = 0; + if (OB_FAIL(databuff_printf(err_msg, ERR_MSG_LEN, pos, + "Tenant (%lu) 'enable_rebalance' is disabled, alter tenant unit num", tenant_id))) { + if (OB_SIZE_OVERFLOW == ret) { + LOG_WARN("format to buff size overflow", KR(ret)); + } else { + LOG_WARN("format new unit num failed", KR(ret)); } } else { - LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_id); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, err_msg); } +} + +int ObUnitManager::cancel_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + common::ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + int64_t job_id = 0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); + } else if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, trans))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] there is no rs job in table",KR(ret), K(tenant_id)); + } else { + LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id)); + } + } else { + ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] cancel an inprogress rs job", KR(ret), + K(tenant_id), K(job_id)); + } + return ret; +} + +int ObUnitManager::create_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + const int64_t new_unit_num, + int64_t &job_id, + common::ObMySQLTransaction &trans, + ObRsJobType job_type) +{ + int ret = OB_SUCCESS; + job_id = 0; + const int64_t extra_info_len = common::MAX_ROOTSERVICE_EVENT_EXTRA_INFO_LENGTH; + HEAP_VAR(char[extra_info_len], extra_info) { + memset(extra_info, 0, extra_info_len); + int64_t pos = 0; + share::schema::ObSchemaGetterGuard schema_guard; + const ObSimpleTenantSchema *tenant_schema; + if (OB_FAIL(databuff_printf(extra_info, extra_info_len, pos, "new_unit_num: %ld", new_unit_num))) { + if (OB_SIZE_OVERFLOW == ret) { + LOG_WARN("format to buff size overflow", K(ret)); + } else { + LOG_WARN("format new unit num failed", K(ret)); + } + } else if (OB_ISNULL(schema_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("schema service is null", KR(ret), KP(schema_service_)); + } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_schema))) { + LOG_WARN("fail to get tenant info", K(ret), K(tenant_id)); + } else if (OB_ISNULL(tenant_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tenant schema is null", KR(ret), KP(tenant_schema)); + } else if (OB_FAIL(RS_JOB_CREATE_WITH_RET( + job_id, + job_type, + trans, + "tenant_id", tenant_id, + "tenant_name", tenant_schema->get_tenant_name(), + "extra_info", extra_info))) { + LOG_WARN("fail to create rs job", KR(ret), K(tenant_id), K(job_type)); + } + } + return ret; +} + +int ObUnitManager::rollback_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + const int64_t new_unit_num, + common::ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + int64_t job_id = 0; + uint64_t tenant_data_version = 0; + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_WARN("fail to get min data version", KR(ret), K(tenant_id)); + } else if (tenant_data_version < DATA_VERSION_4_2_1_0) { + // in v4.1, it's allowed, since this will not track the number of logstreams + if (OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) { + LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job in v4.1", KR(ret), K(tenant_id)); + } + } else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id)); + (void) print_user_error_(tenant_id); + } else if (OB_FAIL(cancel_alter_resource_tenant_unit_num_rs_job(tenant_id, trans))) { + LOG_WARN("fail to execute cancel_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id)); + } else if (OB_FAIL(create_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, job_id, trans))) { + LOG_WARN("fail to execute create_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id), K(new_unit_num)); + } + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] rollback a SHRINK UNIT_NUM rs job", + KR(ret), K(tenant_id), K(job_id), K(new_unit_num)); return ret; } @@ -1521,31 +1674,55 @@ int ObUnitManager::try_complete_shrink_tenant_pool_unit_num_rs_job( common::ObMySQLTransaction &trans) { int ret = OB_SUCCESS; - if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, trans))) { + // this function is called by ObDDLService::drop_resource_pool_pre + const int check_ret = OB_SUCCESS; // no need to check ls status + int64_t job_id = 0; + if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, job_id, trans))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; } else { - LOG_WARN("complete_shrink_tenant_pool_unit_num_rs_job failed", KR(ret), "tenant_id" K(tenant_id)); + LOG_WARN("fail to execute find_alter_resource_tenant_unit_num_rs_job", KR(ret), K(tenant_id)); } - } else {} // do nothing + } else { + ret = complete_shrink_tenant_pool_unit_num_rs_job_( + tenant_id, job_id, check_ret, trans); + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job DROP_TENANT_FORCE", + KR(ret), K(tenant_id), K(job_id)); + } return ret; } int ObUnitManager::complete_shrink_tenant_pool_unit_num_rs_job_( const uint64_t tenant_id, + const int64_t job_id, + const int check_ret, common::ObMySQLTransaction &trans) { - ObRsJobInfo job_info; - int ret = RS_JOB_FIND(job_info, trans, - "job_type", "SHRINK_RESOURCE_TENANT_UNIT_NUM", - "job_status", "INPROGRESS", - "tenant_id", tenant_id); - if (OB_SUCC(ret) && job_info.job_id_ > 0) { - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, 0, trans))) { // job success - LOG_WARN("update all_rootservice_job failed", K(ret), K(job_info)); - } + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || job_id < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id), K(job_id)); + } else if (OB_SUCC(RS_JOB_COMPLETE(job_id, check_ret, trans))) { + FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogressing rs job SHRINK UNIT_NUM", + KR(ret), K(tenant_id), K(job_id), K(check_ret)); } else { - LOG_WARN("failed to find rs job", K(ret), "tenant_id" K(tenant_id)); + LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret)); + if (OB_EAGAIN == ret) { + int64_t curr_job_id = 0; + if (OB_FAIL(find_alter_resource_tenant_unit_num_rs_job(tenant_id, curr_job_id, trans))) { + LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret)); + if (OB_ENTRY_NOT_EXIST == ret) { + FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job SHRINK UNIT_NUM might has " + "been already deleted in table manually", + KR(ret), K(tenant_id), K(job_id), K(check_ret)); + ret = OB_SUCCESS; + } + } else { + ret = OB_EAGAIN; + FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] a non-checked rs job SHRINK UNIT_NUM cannot be committed", + KR(ret), K(job_id), K(curr_job_id)); + } + } } return ret ; } @@ -1625,6 +1802,8 @@ int ObUnitManager::expand_tenant_pools_unit_num_( LOG_WARN("fail to generate new unit group id array", KR(ret), K(pools), K(new_unit_num)); } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { LOG_WARN("fail to start transaction", KR(ret)); + } else if (OB_FAIL(register_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, AUN_EXPAND, trans))) { + LOG_WARN("fail to register shrink tenant pool unit num rs job", KR(ret), K(tenant_id)); } else { share::ObResourcePool new_pool; for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) { @@ -1757,7 +1936,7 @@ int ObUnitManager::get_to_be_deleted_unit_group( } } else { ret = OB_OP_NOT_ALLOW; - LOG_USER_ERROR(OB_OP_NOT_ALLOW, "delete unit group which does not belong to this tenant"); + LOG_USER_ERROR(OB_OP_NOT_ALLOW, "delete unit group which is not belong to this tenant"); } } } else { @@ -1837,8 +2016,7 @@ int ObUnitManager::shrink_tenant_pools_unit_num( } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { LOG_WARN("fail to start transaction", KR(ret)); } else { - if (OB_FAIL(register_shrink_tenant_pool_unit_num_rs_job( - tenant_id, new_unit_num, trans))) { + if (OB_FAIL(register_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, AUN_SHRINK, trans))) { LOG_WARN("fail to register shrink tenant pool unit num rs job", KR(ret), K(tenant_id)); } else { share::ObResourcePool new_pool; @@ -1882,11 +2060,13 @@ int ObUnitManager::shrink_tenant_pools_unit_num( } } // however, we need to end this transaction - const bool commit = (OB_SUCCESS == ret); - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = trans.end(commit))) { - LOG_WARN("trans end failed", K(tmp_ret), K(commit)); - ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + if (trans.is_started()) { + const bool commit = (OB_SUCCESS == ret); + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = trans.end(commit))) { + LOG_WARN("trans end failed", K(tmp_ret), K(commit)); + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } } // modify in memory pool/unit info for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) { @@ -1932,8 +2112,8 @@ int ObUnitManager::rollback_tenant_shrink_pools_unit_num( } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { LOG_WARN("start transaction failed", K(ret)); } else { - if (OB_FAIL(rollback_shrink_tenant_pool_unit_num_rs_job(tenant_id, trans))) { - LOG_WARN("rollback rs_job failed ", K(ret)); + if (OB_FAIL(rollback_alter_resource_tenant_unit_num_rs_job(tenant_id, new_unit_num, trans))) { + LOG_WARN("rollback rs_job failed ", KR(ret), K(new_unit_num)); } else { share::ObResourcePool new_pool; for (int64_t i = 0; OB_SUCC(ret) && i < pools.count(); ++i) { @@ -2057,14 +2237,9 @@ int ObUnitManager::alter_resource_tenant( tenant_id, *pools, new_unit_num))) { LOG_WARN("fail to rollback shrink pool unit num", K(ret), K(new_unit_num)); } - } else if (!ObShareUtil::is_tenant_enable_rebalance(tenant_id)) { - ret = OB_OP_NOT_ALLOW; - LOG_WARN("enable_rebalance is disabled, modify tenant unit num not allowed", KR(ret), K(tenant_id)); - char err_msg[DEFAULT_BUF_LENGTH]; - (void)snprintf(err_msg, sizeof(err_msg), - "Tenant (%lu) 'enable_rebalance' is disabled, alter tenant unit num", tenant_id); - LOG_USER_ERROR(OB_OP_NOT_ALLOW, err_msg); } else if (AUN_EXPAND == alter_unit_num_type) { + // in 4.1, if enable_rebalance is false, this op can be executed successfully + // in 4.2, it will return OB_OP_NOT_ALLOW if (delete_unit_group_id_array.count() > 0) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "expand pool unit num combined with deleting unit"); @@ -2074,6 +2249,7 @@ int ObUnitManager::alter_resource_tenant( KPC(pools)); } } else if (AUN_SHRINK == alter_unit_num_type) { + // both 4.1 and 4.2 do not allow this op when enable_rebalance is false. if (OB_FAIL(shrink_tenant_pools_unit_num( tenant_id, *pools, new_unit_num, delete_unit_group_id_array))) { LOG_WARN("fail to shrink pool unit num", K(ret), K(new_unit_num)); @@ -4023,7 +4199,9 @@ int ObUnitManager::get_all_unit_infos_by_tenant(const uint64_t tenant_id, } int ObUnitManager::commit_shrink_tenant_resource_pool( - const uint64_t tenant_id) + const uint64_t tenant_id, + const int64_t job_id, + const int check_ret) { int ret = OB_SUCCESS; SpinWLockGuard guard(lock_); @@ -4044,8 +4222,8 @@ int ObUnitManager::commit_shrink_tenant_resource_pool( LOG_WARN("pool ptr is null", KR(ret), KP(pools)); } else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) { LOG_WARN("start transaction failed", KR(ret)); - } else if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, trans))) { - LOG_WARN("do rs_job failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(complete_shrink_tenant_pool_unit_num_rs_job_(tenant_id, job_id, check_ret, trans))) { + LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(job_id), K(check_ret)); } else if (OB_FAIL(commit_shrink_resource_pool_in_trans_(*pools, trans, resource_units))) { LOG_WARN("failed to shrink in trans", KR(ret), KPC(pools)); } @@ -6394,15 +6572,14 @@ int ObUnitManager::complete_migrate_unit_rs_job_in_pool( } else if (unit->is_manual_migrate()) { char ip_buf[common::MAX_IP_ADDR_LENGTH]; (void)unit->server_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - ObRsJobInfo job_info; - int tmp_ret = RS_JOB_FIND(job_info, *proxy_, "job_type", "MIGRATE_UNIT", - "job_status", "INPROGRESS", + int64_t job_id = 0; + int tmp_ret = RS_JOB_FIND(MIGRATE_UNIT, job_id, *proxy_, "unit_id", unit->unit_id_, "svr_ip", ip_buf, "svr_port", unit->server_.get_port()); - if (OB_SUCCESS == tmp_ret && job_info.job_id_ > 0) { - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, result_ret, trans))) { - LOG_WARN("all_rootservice_job update failed", K(ret), K(result_ret), K(job_info)); + if (OB_SUCCESS == tmp_ret && job_id > 0) { + if (OB_FAIL(RS_JOB_COMPLETE(job_id, result_ret, trans))) { + LOG_WARN("all_rootservice_job update failed", K(ret), K(result_ret), K(job_id)); } } } @@ -8304,11 +8481,17 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const } else if (is_manual) { char ip_buf[common::MAX_IP_ADDR_LENGTH]; (void)dst.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - const int64_t job_id = RS_JOB_CREATE(MIGRATE_UNIT, trans, "unit_id", unit_id, - "svr_ip", ip_buf, "svr_port", dst.get_port(), "tenant_id", pool->tenant_id_); - if (job_id < 1) { - ret = OB_SQL_OPT_ERROR; - LOG_WARN("insert into all_rootservice_job failed ", K(ret)); + int64_t job_id = 0; + if (OB_FAIL(RS_JOB_CREATE_WITH_RET( + job_id, + ObRsJobType::JOB_TYPE_MIGRATE_UNIT, + trans, + "unit_id", unit_id, + "svr_ip", ip_buf, + "svr_port", dst.get_port(), + "tenant_id", pool->tenant_id_))) { + LOG_WARN("fail to create rs job MIGRATE_UNIT", KR(ret), "tenant_id", pool->tenant_id_, + K(unit_id)); } else if (!granted) { if (OB_FAIL(RS_JOB_COMPLETE(job_id, OB_SUCCESS, trans))) { LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id)); @@ -8535,15 +8718,14 @@ int ObUnitManager::end_migrate_unit(const uint64_t unit_id, const EndMigrateOp e // complete the job if exists char ip_buf[common::MAX_IP_ADDR_LENGTH]; (void)unit_server.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); - ObRsJobInfo job_info; - tmp_ret = RS_JOB_FIND(job_info, trans, "job_type", "MIGRATE_UNIT", - "job_status", "INPROGRESS", "unit_id", unit_id, + int64_t job_id = 0; + tmp_ret = RS_JOB_FIND(MIGRATE_UNIT, job_id, trans, "unit_id", unit_id, "svr_ip", ip_buf, "svr_port", unit_server.get_port()); - if (OB_SUCCESS == tmp_ret && job_info.job_id_ > 0) { + if (OB_SUCCESS == tmp_ret && job_id > 0) { tmp_ret = (end_migrate_op == COMMIT) ? OB_SUCCESS : (end_migrate_op == REVERSE ? OB_CANCELED : OB_TIMEOUT); - if (OB_FAIL(RS_JOB_COMPLETE(job_info.job_id_, tmp_ret, trans))) { - LOG_WARN("all_rootservice_job update failed", K(ret), K(job_info)); + if (OB_FAIL(RS_JOB_COMPLETE(job_id, tmp_ret, trans))) { + LOG_WARN("all_rootservice_job update failed", K(ret), K(job_id)); } } else { //Can not find the situation, only the user manually opened will write rs_job diff --git a/src/rootserver/ob_unit_manager.h b/src/rootserver/ob_unit_manager.h index 45dfc60e5..fa1e7216b 100644 --- a/src/rootserver/ob_unit_manager.h +++ b/src/rootserver/ob_unit_manager.h @@ -171,6 +171,10 @@ public: const uint64_t tenant_id, const int64_t new_unit_num, const common::ObIArray &unit_group_id_array); + int find_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + int64_t &job_id, + common::ObISQLClient &sql_proxy); virtual int check_locality_for_logonly_unit(const share::schema::ObTenantSchema &tenant_schema, const common::ObIArray &pool_names, bool &is_permitted); @@ -228,7 +232,10 @@ public: common::ObIArray &unit_infos) const; virtual int get_deleting_units_of_pool(const uint64_t resource_pool_id, common::ObIArray &units) const; - virtual int commit_shrink_tenant_resource_pool(const uint64_t tenant_id); + virtual int commit_shrink_tenant_resource_pool( + const uint64_t tenant_id, + const int64_t job_id, + const int check_ret); virtual int get_all_unit_infos_by_tenant(const uint64_t tenant_id, common::ObIArray &unit_infos); virtual int get_unit_infos(const common::ObIArray &pools, @@ -478,15 +485,34 @@ private: const common::ObZone &zone, const common::ObReplicaType replica_type, common::ObIArray &unit_loads); + int register_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + const int64_t new_unit_num, + const AlterUnitNumType alter_unit_num_type, + common::ObMySQLTransaction &trans); int register_shrink_tenant_pool_unit_num_rs_job( const uint64_t tenant_id, const int64_t new_unit_num, common::ObMySQLTransaction &trans); - int rollback_shrink_tenant_pool_unit_num_rs_job( + int rollback_alter_resource_tenant_unit_num_rs_job( const uint64_t tenant_id, + const int64_t new_unit_num, common::ObMySQLTransaction &trans); + + int cancel_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + common::ObMySQLTransaction &trans); + int create_alter_resource_tenant_unit_num_rs_job( + const uint64_t tenant_id, + const int64_t new_unit_num, + int64_t &job_id, + common::ObMySQLTransaction &trans, + ObRsJobType job_type = ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM); + int complete_shrink_tenant_pool_unit_num_rs_job_( const uint64_t tenant_id, + const int64_t job_id, + const int check_ret, common::ObMySQLTransaction &trans); int complete_migrate_unit_rs_job_in_pool( const int64_t resource_pool_id, @@ -972,6 +998,7 @@ private: } return str; } + void print_user_error_(const uint64_t tenant_id); private: bool inited_; diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 53f8a42e4..9a1b9bc2a 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -548,6 +548,9 @@ class ObString; ACT(STOP_PRIMARY_LS_THREAD,)\ ACT(TRANSFER_GET_BACKFILL_TABLETS_BEFORE,)\ ACT(STOP_LS_RECOVERY_THREAD,)\ + ACT(BEFORE_FINISH_PRIMARY_ZONE,)\ + ACT(BEFORE_FINISH_UNIT_NUM,)\ + ACT(BEFORE_CHECK_PRIMARY_ZONE,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_tenant_info_proxy.cpp b/src/share/ob_tenant_info_proxy.cpp index b449a4c82..37260a91b 100755 --- a/src/share/ob_tenant_info_proxy.cpp +++ b/src/share/ob_tenant_info_proxy.cpp @@ -225,7 +225,26 @@ int ObAllTenantInfoProxy::init_tenant_info( return ret; } - +int ObAllTenantInfoProxy::get_tenant_role( + ObISQLClient *proxy, + const uint64_t tenant_id, + ObTenantRole &tenant_role) +{ + int ret = OB_SUCCESS; + ObAllTenantInfo tenant_info; + if (OB_ISNULL(proxy)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("proxy is null", KR(ret), KP(proxy)); + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tenant_id is invalid", KR(ret), K(tenant_id)); + } else if (OB_FAIL(load_tenant_info(tenant_id, proxy, false /*for_update*/, tenant_info))) { + LOG_WARN("fail to load_tenant_info", KR(ret), K(tenant_id)); + } else { + tenant_role = tenant_info.get_tenant_role(); + } + return ret; +} int ObAllTenantInfoProxy::is_standby_tenant( ObISQLClient *proxy, const uint64_t tenant_id, @@ -233,46 +252,29 @@ int ObAllTenantInfoProxy::is_standby_tenant( { int ret = OB_SUCCESS; is_standby = false; - if (OB_ISNULL(proxy)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("proxy is null", KR(ret), KP(proxy)); - } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("tenant_id is invalid", KR(ret), K(tenant_id)); - } else if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) { - is_standby = false; + ObTenantRole tenant_role; + // the validity checking is in get_tenant_role + if (OB_FAIL(get_tenant_role(proxy, tenant_id, tenant_role))) { + LOG_WARN("fail to get tenant_role", KR(ret), K(tenant_id)); } else { - HEAP_VAR(ObMySQLProxy::MySQLResult, res) { - ObSqlString sql; - ObTimeoutCtx ctx; - common::sqlclient::ObMySQLResult *result = NULL; - uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) { - LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx)); - } else if (OB_FAIL(sql.assign_fmt("select tenant_role from %s where tenant_id = %lu", - OB_ALL_TENANT_INFO_TNAME, tenant_id))) { - LOG_WARN("failed to assign sql", KR(ret), K(sql)); - } else if (OB_FAIL(proxy->read(res, exec_tenant_id, sql.ptr()))) { - LOG_WARN("failed to read", KR(ret), K(exec_tenant_id), K(sql)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get sql result", KR(ret)); - } else if (OB_FAIL(result->next())) { - if (OB_ITER_END == ret) { - ret = OB_TENANT_NOT_EXIST; - LOG_WARN("tenant not exist", KR(ret), K(sql)); - } else { - LOG_WARN("fail to get next", KR(ret), K(sql)); - } - } else { - ObString tenant_role_str; - EXTRACT_VARCHAR_FIELD_MYSQL(*result, "tenant_role", tenant_role_str); - if (OB_SUCC(ret)) { - ObTenantRole tenant_role(tenant_role_str); - is_standby = tenant_role.is_standby(); - } - } - } // end HEAP_VAR + is_standby = tenant_role.is_standby(); + } + return ret; +} + +int ObAllTenantInfoProxy::is_primary_tenant( + ObISQLClient *proxy, + const uint64_t tenant_id, + bool &is_primary) +{ + int ret = OB_SUCCESS; + is_primary = false; + ObTenantRole tenant_role; + // the validity checking is in get_tenant_role + if (OB_FAIL(get_tenant_role(proxy, tenant_id, tenant_role))) { + LOG_WARN("fail to get tenant_role", KR(ret), K(tenant_id)); + } else { + is_primary = tenant_role.is_primary(); } return ret; } diff --git a/src/share/ob_tenant_info_proxy.h b/src/share/ob_tenant_info_proxy.h index 52bbd0786..4edb7ca87 100755 --- a/src/share/ob_tenant_info_proxy.h +++ b/src/share/ob_tenant_info_proxy.h @@ -162,9 +162,17 @@ public: static int init_tenant_info(const ObAllTenantInfo &tenant_info, ObISQLClient *proxy); static int is_standby_tenant( - ObISQLClient *proxy, - const uint64_t tenant_id, - bool &is_standby); + ObISQLClient *proxy, + const uint64_t tenant_id, + bool &is_standby); + static int is_primary_tenant( + ObISQLClient *proxy, + const uint64_t tenant_id, + bool &is_primary); + static int get_tenant_role( + ObISQLClient *proxy, + const uint64_t tenant_id, + ObTenantRole &tenant_role); static int get_primary_tenant_ids( ObISQLClient *proxy, diff --git a/unittest/rootserver/test_rs_job_table_operator.cpp b/unittest/rootserver/test_rs_job_table_operator.cpp index e5dd2da56..eb456a04f 100644 --- a/unittest/rootserver/test_rs_job_table_operator.cpp +++ b/unittest/rootserver/test_rs_job_table_operator.cpp @@ -126,7 +126,7 @@ TEST_F(TestRsJobTableOperator, test_api) // find job ASSERT_EQ(OB_SUCCESS, trans.start(&db_initer_.get_sql_proxy())); - ret = RS_JOB_FIND(job_info, trans, "job_type", "ALTER_TENANT_LOCALITY", "job_status", "INPROGRESS"); + ret = RS_JOB_FIND(ALTER_TENANT_LOCALITY, job_info, trans); ASSERT_EQ(OB_SUCCESS, ret); ASSERT_EQ(job_id1, job_info.job_id_); ASSERT_EQ(50, job_info.progress_); @@ -168,7 +168,7 @@ TEST_F(TestRsJobTableOperator, test_api) ASSERT_EQ(OB_TRANS_CTX_NOT_EXIST, ret); ret = RS_JOB_UPDATE(job_id1, trans, "extra_info", "slflskdjfoiw"); ASSERT_EQ(OB_TRANS_CTX_NOT_EXIST, ret); - ret = RS_JOB_FIND(job_info, trans, "job_type", "ALTER_TENANT_LOCALITY", "job_status", "INPROGRESS"); + ret = RS_JOB_FIND(ALTER_TENANT_LOCALITY, job_info, trans); ASSERT_EQ(OB_TRANS_CTX_NOT_EXIST, ret); ret = RS_JOB_COMPLETE(job_id1, 0, trans); ASSERT_EQ(OB_TRANS_CTX_NOT_EXIST, ret);