diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 09ad407810..60d2d6bca8 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -1505,20 +1505,33 @@ int ObRestoreService::reset_schema_status(const uint64_t tenant_id, common::ObMy int ObRestoreService::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info) { int ret = OB_SUCCESS; - const int64_t concurrency = job_info.get_concurrency(); - const ObString &tenant_name = job_info.get_tenant_name(); + double cpu_count = 0; int64_t ha_high_thread_score = 0; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id)); - if (tenant_config.is_valid()) { - ha_high_thread_score = tenant_config->ha_high_thread_score; - } + // restore concurrency controls the number of threads used by restore dag. + // if cpu number is less than 10, use the default value. + // if cpu number is between 10 ~ 100, let concurrency equals to the cpu number. + // if cpu number is exceed 100, let concurrency equals to 100. + const int64_t LOW_CPU_LIMIT = 10; + const int64_t MAX_CPU_LIMIT = 100; if (!job_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(job_info)); - } else if (0 == concurrency || 0 != ha_high_thread_score) { - LOG_INFO("do nothing", K(concurrency), K(ha_high_thread_score)); - } else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) { - LOG_WARN("failed to update restore concurrency", K(ret), K(job_info)); + } else if (tenant_config.is_valid() && OB_FALSE_IT(ha_high_thread_score = tenant_config->ha_high_thread_score)) { + } else if (0 != ha_high_thread_score) { + LOG_INFO("ha high thread score has been set", K(ha_high_thread_score)); + } else if (OB_FAIL(ObRestoreUtil::get_restore_tenant_cpu_count(*sql_proxy_, new_tenant_id, cpu_count))) { + LOG_WARN("failed to get restore tenant cpu count", K(ret), K(new_tenant_id)); + } else { + int64_t concurrency = job_info.get_concurrency(); + if (LOW_CPU_LIMIT < cpu_count && MAX_CPU_LIMIT >= cpu_count) { + concurrency = std::max(static_cast(cpu_count), concurrency); + } else if (MAX_CPU_LIMIT < cpu_count) { + concurrency = MAX_CPU_LIMIT; + } + if (OB_FAIL(update_restore_concurrency_(job_info.get_tenant_name(), new_tenant_id, concurrency))) { + LOG_WARN("failed to update restore concurrency", K(ret), K(job_info)); + } } return ret; } diff --git a/src/rootserver/restore/ob_restore_util.cpp b/src/rootserver/restore/ob_restore_util.cpp index d07dbae2e5..6cf265c42e 100644 --- a/src/rootserver/restore/ob_restore_util.cpp +++ b/src/rootserver/restore/ob_restore_util.cpp @@ -29,6 +29,7 @@ #include "storage/ls/ob_ls_meta_package.h"//ls_meta #include "share/backup/ob_archive_path.h" #include "share/ob_upgrade_utils.h" +#include "share/ob_unit_table_operator.h" using namespace oceanbase::common; using namespace oceanbase; @@ -970,6 +971,37 @@ int ObRestoreUtil::check_physical_restore_finish( return ret; } +int ObRestoreUtil::get_restore_tenant_cpu_count( + common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count) +{ + int ret = OB_SUCCESS; + share::ObUnitTableOperator unit_op; + common::ObArray pools; + common::ObArray unit_config_ids; + common::ObArray configs; + if (OB_FAIL(unit_op.init(proxy))) { + LOG_WARN("failed to init proxy", K(ret)); + } else if (OB_FAIL(unit_op.get_resource_pools(tenant_id, pools))) { + LOG_WARN("failed to get resource pool", K(ret), K(tenant_id)); + } + ARRAY_FOREACH(pools, i) { + if (OB_FAIL(unit_config_ids.push_back(pools.at(i).unit_config_id_))) { + LOG_WARN("failed to push back unit config", K(ret)); + } + } + if (FAILEDx(unit_op.get_unit_configs(unit_config_ids, configs))) { + LOG_WARN("failed to get unit configs", K(ret)); + } + double max_cpu = OB_MAX_CPU_NUM; + ARRAY_FOREACH(configs, i) { + max_cpu = std::min(max_cpu, configs.at(i).max_cpu()); + } + if (OB_SUCC(ret)) { + cpu_count = max_cpu; + } + return ret; +} + int ObRestoreUtil::convert_restore_timestamp_to_scn_( const ObString ×tamp, const common::ObTimeZoneInfoWrap &time_zone_wrap, diff --git a/src/rootserver/restore/ob_restore_util.h b/src/rootserver/restore/ob_restore_util.h index f51e6bbd6b..fae40eb154 100644 --- a/src/rootserver/restore/ob_restore_util.h +++ b/src/rootserver/restore/ob_restore_util.h @@ -72,7 +72,7 @@ public: const share::ObLSID &ls_id, palf::PalfBaseInfo &palf_base_info); static int check_physical_restore_finish(common::ObISQLClient &proxy, uint64_t tenant_id, bool &is_finish, bool &is_failed); - + static int get_restore_tenant_cpu_count(common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count); private: static int fill_backup_info_( const obrpc::ObPhysicalRestoreTenantArg &arg,