diff --git a/src/rootserver/ob_alter_locality_finish_checker.cpp b/src/rootserver/ob_alter_locality_finish_checker.cpp index 9ec94b3178..d63d0386cc 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.cpp +++ b/src/rootserver/ob_alter_locality_finish_checker.cpp @@ -26,7 +26,7 @@ using namespace common; using namespace share; namespace rootserver { -OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_, rs_job_id_, rs_job_check_ret_); +OB_SERIALIZE_MEMBER((ObCommitAlterTenantLocalityArg, ObDDLArg), tenant_id_); ObAlterLocalityFinishChecker::ObAlterLocalityFinishChecker(volatile bool &stop) : inited_(false), @@ -103,9 +103,7 @@ int ObAlterLocalityFinishChecker::check() DEBUG_SYNC(BEFORE_CHECK_LOCALITY); bool alter_locality_finish = false; bool meta_alter_locality_finish = false; - int check_ret = OB_NEED_WAIT; uint64_t tenant_id = OB_INVALID_TENANT_ID; - int64_t job_id = 0; ObCurTraceId::init(GCONF.self_addr_); if (OB_ISNULL(tenant_schemas.at(i)) || OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; @@ -116,26 +114,17 @@ int ObAlterLocalityFinishChecker::check() LOG_WARN("invalid tenant schema", KR(ret), "schema", tenant_schemas.at(i)); } else if (FALSE_IT(tenant_id = tenant_schemas.at(i)->get_tenant_id())) { // shall never be here - } else if (is_meta_tenant(tenant_id) - || tenant_schemas.at(i)->get_previous_locality_str().empty()) { + } else if (is_meta_tenant(tenant_id)) { continue; - } else if (OB_FAIL(find_rs_job(tenant_id, job_id, *GCTX.sql_proxy_))) { - // find the corresponding rs job at first, then check if we can complete it - // if we only find the rs job at the committing period, - // we do not know whether the job has been changed during checking process - // e.g. job 1 is the rs job before checking, - // right after checking, job 2 is created and job 1 is canceled by job 2, - // then committing process will find job 2 and complete job 2 immediately, - // which means, job 2 is completed without checking. - if (OB_ENTRY_NOT_EXIST == ret) { - FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] there exists locality changing without corresponding rs job", - KR(ret), KPC(tenant_schemas.at(i))); - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to find rs job", KR(ret), K(tenant_id)); + } else if (tenant_schemas.at(i)->get_previous_locality_str().empty()) { + // two possibilities + // 1. we cannot find any inprogress ALTER_TENANT_LOCALITY in __all_rootservice_job + // 2. there is an inprogress rs job ALTER_TENANT_LOCALITY, all tasks related to disaster recovery + // (e.g. add missing ls replicas, remove redundant ls replicas) have been finished + // the last thing to do is that check whether ls are balanced and complete the rs job if yes + if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY))) { + LOG_WARN("fail to check and commit rs job", KR(ret)); } - } - if (OB_FAIL(ret)) { } else if (OB_SUCCESS != (tmp_ret = ObDRWorker::check_tenant_locality_match( tenant_id, *unit_mgr_, @@ -150,18 +139,13 @@ int ObAlterLocalityFinishChecker::check() meta_alter_locality_finish))){ LOG_WARN("fail to check tenant locality match", KR(tmp_ret), "meta_tenant_id", gen_meta_tenant_id(tenant_id), K(meta_alter_locality_finish)); - } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { - LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); } else if (alter_locality_finish - && OB_NEED_WAIT != check_ret && (meta_alter_locality_finish || is_sys_tenant(tenant_id))) { DEBUG_SYNC(BEFORE_FINISH_LOCALITY); const int64_t timeout = GCONF.internal_sql_execute_timeout; // 30s default rootserver::ObCommitAlterTenantLocalityArg arg; arg.tenant_id_ = tenant_id; arg.exec_tenant_id_ = OB_SYS_TENANT_ID; - arg.rs_job_id_ = job_id; - arg.rs_job_check_ret_ = check_ret; if (OB_FAIL(check_stop())) { LOG_WARN("ObAlterLocalityFinishChecker stopped", KR(ret)); } else if (OB_SUCCESS != (tmp_ret = common_rpc_proxy_->to(self_).timeout(timeout).commit_alter_tenant_locality(arg))) { @@ -204,5 +188,6 @@ int ObAlterLocalityFinishChecker::check_stop() const } return ret; } + } // end namespace rootserver } // end namespace oceanbase diff --git a/src/rootserver/ob_alter_locality_finish_checker.h b/src/rootserver/ob_alter_locality_finish_checker.h index b6bb28fc15..7e30c14c5d 100644 --- a/src/rootserver/ob_alter_locality_finish_checker.h +++ b/src/rootserver/ob_alter_locality_finish_checker.h @@ -41,16 +41,11 @@ struct ObCommitAlterTenantLocalityArg : public obrpc::ObDDLArg { OB_UNIS_VERSION(1); public: - ObCommitAlterTenantLocalityArg() : - tenant_id_(common::OB_INVALID_ID), - rs_job_id_(0), - rs_job_check_ret_(OB_NEED_WAIT) {} + ObCommitAlterTenantLocalityArg() : tenant_id_(common::OB_INVALID_ID) {} bool is_valid() const { return common::OB_INVALID_ID != tenant_id_;} - TO_STRING_KV(K_(tenant_id), K_(rs_job_id), K_(rs_job_check_ret)); + TO_STRING_KV(K_(tenant_id)); uint64_t tenant_id_; - int64_t rs_job_id_; - int rs_job_check_ret_; }; class ObAlterLocalityFinishChecker : public share::ObCheckStopProvider @@ -66,14 +61,13 @@ public: ObUnitManager &unit_mgr, ObZoneManager &zone_mgr, common::ObMySQLProxy &sql_proxy, - share::ObLSTableOperator &lst_operator); + share::ObLSTableOperator &lst_operator); int check(); static int find_rs_job(const uint64_t tenant_id, int64_t &job_id, ObISQLClient &sql_proxy); private: //check whether this checker is stopped virtual int check_stop() const override; - private: bool inited_; share::schema::ObMultiVersionSchemaService *schema_service_; diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 050685115b..4091fe50a4 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -23453,29 +23453,6 @@ int ObDDLService::commit_alter_tenant_locality( LOG_WARN("fail to alter meta tenant", KR(ret)); } } - int return_code = arg.rs_job_check_ret_; - int64_t job_id = arg.rs_job_id_; - if (OB_FAIL(ret)) { - } else if (OB_SUCC(RS_JOB_COMPLETE(job_id, return_code, trans))) { - FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] complete an inprogress rs job", KR(ret), - K(arg), K(return_code)); - } else { - LOG_WARN("fail to complete rs job", KR(ret), K(job_id), K(return_code)); - if (OB_EAGAIN == ret) { - int64_t find_job_id = 0; - if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(arg.tenant_id_, find_job_id, trans))) { - LOG_WARN("fail to find rs job", KR(ret), K(arg)); - if (OB_ENTRY_NOT_EXIST == ret) { - FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] the specified rs job might has " - "been already deleted in table manually", KR(ret), K(arg), K(return_code)); - ret = OB_SUCCESS; - } - } else { - ret = OB_EAGAIN; - FLOG_WARN("[ALTER_TENANT_LOCALITY NOTICE] a non-checked rs job cannot be committed", KR(ret), K(arg), K(find_job_id)); - } - } - } } int temp_ret = OB_SUCCESS; if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) { @@ -24817,16 +24794,22 @@ int ObDDLService::record_tenant_locality_event_history( if (ALTER_LOCALITY_OP_INVALID == alter_locality_op) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid alter locality op", K(ret), K(alter_locality_op)); - } else if (ROLLBACK_ALTER_LOCALITY == alter_locality_op) { + } else { int64_t job_id = 0; if (OB_FAIL(ObAlterLocalityFinishChecker::find_rs_job(tenant_schema.get_tenant_id(), job_id, trans))) { - LOG_WARN("failed to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id()); + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job", K(ret), "tenant_id", tenant_schema.get_tenant_id()); + } } else { - ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); // The change task is rolled back, this change failed + ret = RS_JOB_COMPLETE(job_id, OB_CANCELED, trans); FLOG_INFO("[ALTER_TENANT_LOCALITY NOTICE] cancel an old inprogress rs job due to rollback", KR(ret), "tenant_id", tenant_schema.get_tenant_id(), K(job_id)); } - if (FAILEDx(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, tenant_data_version))) { + } + if (OB_SUCC(ret) && ROLLBACK_ALTER_LOCALITY == alter_locality_op) { + if (OB_FAIL(GET_MIN_DATA_VERSION(OB_SYS_TENANT_ID, tenant_data_version))) { LOG_WARN("fail to get sys tenant's min data version", KR(ret)); } else if (tenant_data_version < DATA_VERSION_4_2_1_0) { job_type = ObRsJobType::JOB_TYPE_ROLLBACK_ALTER_TENANT_LOCALITY; diff --git a/src/rootserver/ob_root_utils.cpp b/src/rootserver/ob_root_utils.cpp index 4fc6c6f2ac..e799ba1544 100644 --- a/src/rootserver/ob_root_utils.cpp +++ b/src/rootserver/ob_root_utils.cpp @@ -2272,6 +2272,78 @@ int ObRootUtils::is_first_priority_primary_zone_changed( return ret; } +int ObRootUtils::check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type) +{ + int ret = OB_SUCCESS; + int64_t rs_job_id = 0; + int check_ret = OB_NEED_WAIT; + const char* rs_job_type_str = NULL; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_UNLIKELY(!ObRsJobTableOperator::is_valid_job_type(rs_job_type) + || NULL == (rs_job_type_str = ObRsJobTableOperator::get_job_type_str(rs_job_type)))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid job type", KR(ret), K(rs_job_type), KP(rs_job_type_str)); + } else if (OB_FAIL(find_rs_job(tenant_id, rs_job_type, rs_job_id))) { + // find the corresponding rs job at first, then check if we can complete it + // if we only find the rs job at the committing period, + // we do not know whether the job has been changed during checking process + // e.g. job 1 is the rs job before checking, + // right after checking, job 2 is created and job 1 is canceled by job 2, + // then committing process will find job 2 and complete job 2 immediately, + // which means, job 2 is completed without checking. + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type), K(rs_job_type_str)); + } + } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { + LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); + } else if (OB_NEED_WAIT != check_ret) { + if (OB_SUCC(RS_JOB_COMPLETE(rs_job_id, check_ret, *GCTX.sql_proxy_))) { + FLOG_INFO("[CHECK_AND_COMMIT_RS_JOB NOTICE] complete an inprogress rs job", + KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str)); + } else { + LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), + K(rs_job_type), K(rs_job_type_str)); + if (OB_EAGAIN == ret) { + FLOG_WARN("[CHECK_AND_COMMIT_RS_JOB NOTICE] the specified rs job might has " + "been already completed due to a new job or deleted in table manually", + KR(ret), K(tenant_id), K(rs_job_id), K(check_ret), K(rs_job_type), K(rs_job_type_str)); + ret = OB_SUCCESS; // no need to return the error code + } + } + } + return ret; +} +int ObRootUtils::find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id) +{ + int ret = OB_SUCCESS; + rs_job_id = 0; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ObRsJobTableOperator::is_valid_job_type(rs_job_type))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(rs_job_type)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.sql_proxy_ is null", KR(ret), KP(GCTX.sql_proxy_)); + } else { + switch (rs_job_type) { + case ObRsJobType::JOB_TYPE_ALTER_TENANT_LOCALITY: + ret = ObAlterLocalityFinishChecker::find_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_); + break; + case ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM: + ret = ObUnitManager::find_alter_resource_tenant_unit_num_rs_job(tenant_id, rs_job_id, *GCTX.sql_proxy_); + break; + default: ret = OB_NOT_SUPPORTED; + } + if (OB_FAIL(ret)) { + LOG_WARN("fail to find rs job", KR(ret), K(tenant_id), K(rs_job_type)); + } + } + return ret; +} + /////////////////////////////// ObClusterRole ObClusterInfoGetter::get_cluster_role_v2() diff --git a/src/rootserver/ob_root_utils.h b/src/rootserver/ob_root_utils.h index 165dfb01bb..9bb949a0f6 100644 --- a/src/rootserver/ob_root_utils.h +++ b/src/rootserver/ob_root_utils.h @@ -651,6 +651,8 @@ public: ObIArray &orig_first_primary_zone, ObIArray &new_first_primary_zone, bool &is_changed); + static int check_and_commit_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type); + static int find_rs_job(const uint64_t tenant_id, const ObRsJobType rs_job_type, int64_t &rs_job_id); template static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr, diff --git a/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp b/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp index 2030f412a2..ace5082aed 100644 --- a/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp +++ b/src/rootserver/ob_shrink_expand_resource_pool_checker.cpp @@ -133,8 +133,8 @@ int ObShrinkExpandResourcePoolChecker::check_shrink_resource_pool_finished_by_te // or a SHRINK task which has cleared deleting units in __all_unit table // check whether this task can be committed (i.e. ls is balanced) // if not exists, return OB_SUCCESS - if (OB_FAIL(check_and_commit_rs_job_(tenant_id))) { - LOG_WARN("fail to execute check_and_commit_rs_job_", KR(ret), K(tenant_id)); + if (OB_FAIL(ObRootUtils::check_and_commit_rs_job(tenant_id, ObRsJobType::JOB_TYPE_ALTER_RESOURCE_TENANT_UNIT_NUM))) { + LOG_WARN("fail to execute check_and_commit_rs_job", KR(ret), K(tenant_id)); } } else { //check shrink finish @@ -276,48 +276,5 @@ int ObShrinkExpandResourcePoolChecker::commit_tenant_shrink_resource_pool_(const } else {} // no more to do return ret; } - -int ObShrinkExpandResourcePoolChecker::check_and_commit_rs_job_(const uint64_t tenant_id) -{ - int ret = OB_SUCCESS; - int64_t check_job_id = 0; - int check_ret = OB_NEED_WAIT; - if (OB_ISNULL(sql_proxy_) || OB_ISNULL(unit_mgr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql_proxy_ or unit_mgr_ is null", KR(ret), KP(sql_proxy_), KP(unit_mgr_)); - } else if (OB_FAIL(unit_mgr_->find_alter_resource_tenant_unit_num_rs_job(tenant_id, check_job_id, *sql_proxy_))) { - // find the corresponding rs job at first, then check if we can complete it - // if we only find the rs job at the committing period, - // we do not know whether the job has been changed during checking process - // e.g. job 1 is the rs job before checking, - // right after checking, job 2 is created and job 1 is canceled by job 2, - // then committing process will find job 2 and complete job 2 immediately, - // which means, job 2 is completed without checking. - if (OB_ENTRY_NOT_EXIST == ret) { - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to find rs job", KR(ret), K(tenant_id)); - } - } else if (OB_FAIL(ObRootUtils::check_tenant_ls_balance(tenant_id, check_ret))) { - LOG_WARN("fail to execute check_tenant_ls_balance", KR(ret), K(tenant_id)); - } else if (OB_NEED_WAIT != check_ret) { - DEBUG_SYNC(BEFORE_FINISH_UNIT_NUM); - if (OB_FAIL(check_stop())) { - LOG_WARN("ObShrinkExpandResourcePoolChecker stop", KR(ret)); - } else if (OB_SUCC(RS_JOB_COMPLETE(check_job_id, check_ret, *sql_proxy_))) { - FLOG_INFO("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] complete an inprogress rs job", - KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); - } else { - LOG_WARN("fail to complete rs job", KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); - if (OB_EAGAIN == ret) { - FLOG_WARN("[ALTER_RESOURCE_TENANT_UNIT_NUM NOTICE] the specified rs job might has " - "been already completed due to a new job or deleted in table manually", - KR(ret), K(tenant_id), K(check_job_id), K(check_ret)); - ret = OB_SUCCESS; // no need to return the error code - } - } - } - return ret; -} } // end namespace rootserver } // end oceanbase diff --git a/src/rootserver/ob_shrink_expand_resource_pool_checker.h b/src/rootserver/ob_shrink_expand_resource_pool_checker.h index 801adea997..ca8c79c390 100644 --- a/src/rootserver/ob_shrink_expand_resource_pool_checker.h +++ b/src/rootserver/ob_shrink_expand_resource_pool_checker.h @@ -74,7 +74,6 @@ private: const ObIArray &unit_group_ids, bool &is_finished); int commit_tenant_shrink_resource_pool_(const uint64_t tenant_id); - int check_and_commit_rs_job_(const uint64_t tenant_id); private: const volatile bool &is_stop_; common::ObMySQLProxy *sql_proxy_; diff --git a/src/rootserver/ob_unit_manager.h b/src/rootserver/ob_unit_manager.h index 885e91d6ee..8eb97119c0 100644 --- a/src/rootserver/ob_unit_manager.h +++ b/src/rootserver/ob_unit_manager.h @@ -171,7 +171,7 @@ public: const uint64_t tenant_id, const int64_t new_unit_num, const common::ObIArray &unit_group_id_array); - int find_alter_resource_tenant_unit_num_rs_job( + static int find_alter_resource_tenant_unit_num_rs_job( const uint64_t tenant_id, int64_t &job_id, common::ObISQLClient &sql_proxy); diff --git a/src/share/inner_table/ob_inner_table_schema.21151_21200.cpp b/src/share/inner_table/ob_inner_table_schema.21151_21200.cpp index dd5a4bf2bf..a0c3f307a5 100644 --- a/src/share/inner_table/ob_inner_table_schema.21151_21200.cpp +++ b/src/share/inner_table/ob_inner_table_schema.21151_21200.cpp @@ -760,7 +760,7 @@ int ObInnerTableSchema::dba_ob_tenant_jobs_schema(ObTableSchema &table_schema) table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); if (OB_SUCC(ret)) { - if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT JOB_ID, JOB_TYPE, JOB_STATUS, RESULT_CODE, PROGRESS, gmt_create AS START_TIME, gmt_modified AS MODIFY_TIME, TENANT_ID, SQL_TEXT, EXTRA_INFO, RS_SVR_IP, RS_SVR_PORT FROM oceanbase.__all_rootservice_job WHERE JOB_TYPE in ( 'ALTER_TENANT_LOCALITY', 'ROLLBACK_ALTER_TENANT_LOCALITY', 'SHRINK_RESOURCE_TENANT_UNIT_NUM' ) )__"))) { + if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT JOB_ID, JOB_TYPE, JOB_STATUS, RESULT_CODE, PROGRESS, gmt_create AS START_TIME, gmt_modified AS MODIFY_TIME, TENANT_ID, SQL_TEXT, EXTRA_INFO, RS_SVR_IP, RS_SVR_PORT FROM oceanbase.__all_rootservice_job WHERE JOB_TYPE in ( 'ALTER_TENANT_LOCALITY', 'ROLLBACK_ALTER_TENANT_LOCALITY', 'SHRINK_RESOURCE_TENANT_UNIT_NUM', 'ALTER_TENANT_PRIMARY_ZONE', 'ALTER_RESOURCE_TENANT_UNIT_NUM' ) )__"))) { LOG_ERROR("fail to set view_definition", K(ret)); } } diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index 1ffdf55d64..51797429ca 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -17467,7 +17467,9 @@ WHERE JOB_TYPE in ( 'ALTER_TENANT_LOCALITY', 'ROLLBACK_ALTER_TENANT_LOCALITY', - 'SHRINK_RESOURCE_TENANT_UNIT_NUM' + 'SHRINK_RESOURCE_TENANT_UNIT_NUM', + 'ALTER_TENANT_PRIMARY_ZONE', + 'ALTER_RESOURCE_TENANT_UNIT_NUM' ) """.replace("\n", " ") )