set restore concurrency based on the cpu nums
This commit is contained in:
@ -1505,20 +1505,33 @@ int ObRestoreService::reset_schema_status(const uint64_t tenant_id, common::ObMy
|
|||||||
int ObRestoreService::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
|
int ObRestoreService::may_update_restore_concurrency_(const uint64_t new_tenant_id, const share::ObPhysicalRestoreJob &job_info)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t concurrency = job_info.get_concurrency();
|
double cpu_count = 0;
|
||||||
const ObString &tenant_name = job_info.get_tenant_name();
|
|
||||||
int64_t ha_high_thread_score = 0;
|
int64_t ha_high_thread_score = 0;
|
||||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id));
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(new_tenant_id));
|
||||||
if (tenant_config.is_valid()) {
|
// restore concurrency controls the number of threads used by restore dag.
|
||||||
ha_high_thread_score = tenant_config->ha_high_thread_score;
|
// if cpu number is less than 10, use the default value.
|
||||||
}
|
// if cpu number is between 10 ~ 100, let concurrency equals to the cpu number.
|
||||||
|
// if cpu number is exceed 100, let concurrency equals to 100.
|
||||||
|
const int64_t LOW_CPU_LIMIT = 10;
|
||||||
|
const int64_t MAX_CPU_LIMIT = 100;
|
||||||
if (!job_info.is_valid()) {
|
if (!job_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("get invalid args", K(ret), K(job_info));
|
LOG_WARN("get invalid args", K(ret), K(job_info));
|
||||||
} else if (0 == concurrency || 0 != ha_high_thread_score) {
|
} else if (tenant_config.is_valid() && OB_FALSE_IT(ha_high_thread_score = tenant_config->ha_high_thread_score)) {
|
||||||
LOG_INFO("do nothing", K(concurrency), K(ha_high_thread_score));
|
} else if (0 != ha_high_thread_score) {
|
||||||
} else if (OB_FAIL(update_restore_concurrency_(tenant_name, new_tenant_id, concurrency))) {
|
LOG_INFO("ha high thread score has been set", K(ha_high_thread_score));
|
||||||
LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
|
} else if (OB_FAIL(ObRestoreUtil::get_restore_tenant_cpu_count(*sql_proxy_, new_tenant_id, cpu_count))) {
|
||||||
|
LOG_WARN("failed to get restore tenant cpu count", K(ret), K(new_tenant_id));
|
||||||
|
} else {
|
||||||
|
int64_t concurrency = job_info.get_concurrency();
|
||||||
|
if (LOW_CPU_LIMIT < cpu_count && MAX_CPU_LIMIT >= cpu_count) {
|
||||||
|
concurrency = std::max(static_cast<int64_t>(cpu_count), concurrency);
|
||||||
|
} else if (MAX_CPU_LIMIT < cpu_count) {
|
||||||
|
concurrency = MAX_CPU_LIMIT;
|
||||||
|
}
|
||||||
|
if (OB_FAIL(update_restore_concurrency_(job_info.get_tenant_name(), new_tenant_id, concurrency))) {
|
||||||
|
LOG_WARN("failed to update restore concurrency", K(ret), K(job_info));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
#include "storage/ls/ob_ls_meta_package.h"//ls_meta
|
#include "storage/ls/ob_ls_meta_package.h"//ls_meta
|
||||||
#include "share/backup/ob_archive_path.h"
|
#include "share/backup/ob_archive_path.h"
|
||||||
#include "share/ob_upgrade_utils.h"
|
#include "share/ob_upgrade_utils.h"
|
||||||
|
#include "share/ob_unit_table_operator.h"
|
||||||
|
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
using namespace oceanbase;
|
using namespace oceanbase;
|
||||||
@ -970,6 +971,37 @@ int ObRestoreUtil::check_physical_restore_finish(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObRestoreUtil::get_restore_tenant_cpu_count(
|
||||||
|
common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
share::ObUnitTableOperator unit_op;
|
||||||
|
common::ObArray<share::ObResourcePool> pools;
|
||||||
|
common::ObArray<uint64_t> unit_config_ids;
|
||||||
|
common::ObArray<ObUnitConfig> configs;
|
||||||
|
if (OB_FAIL(unit_op.init(proxy))) {
|
||||||
|
LOG_WARN("failed to init proxy", K(ret));
|
||||||
|
} else if (OB_FAIL(unit_op.get_resource_pools(tenant_id, pools))) {
|
||||||
|
LOG_WARN("failed to get resource pool", K(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
ARRAY_FOREACH(pools, i) {
|
||||||
|
if (OB_FAIL(unit_config_ids.push_back(pools.at(i).unit_config_id_))) {
|
||||||
|
LOG_WARN("failed to push back unit config", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (FAILEDx(unit_op.get_unit_configs(unit_config_ids, configs))) {
|
||||||
|
LOG_WARN("failed to get unit configs", K(ret));
|
||||||
|
}
|
||||||
|
double max_cpu = OB_MAX_CPU_NUM;
|
||||||
|
ARRAY_FOREACH(configs, i) {
|
||||||
|
max_cpu = std::min(max_cpu, configs.at(i).max_cpu());
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
cpu_count = max_cpu;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObRestoreUtil::convert_restore_timestamp_to_scn_(
|
int ObRestoreUtil::convert_restore_timestamp_to_scn_(
|
||||||
const ObString ×tamp,
|
const ObString ×tamp,
|
||||||
const common::ObTimeZoneInfoWrap &time_zone_wrap,
|
const common::ObTimeZoneInfoWrap &time_zone_wrap,
|
||||||
|
@ -72,7 +72,7 @@ public:
|
|||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
palf::PalfBaseInfo &palf_base_info);
|
palf::PalfBaseInfo &palf_base_info);
|
||||||
static int check_physical_restore_finish(common::ObISQLClient &proxy, uint64_t tenant_id, bool &is_finish, bool &is_failed);
|
static int check_physical_restore_finish(common::ObISQLClient &proxy, uint64_t tenant_id, bool &is_finish, bool &is_failed);
|
||||||
|
static int get_restore_tenant_cpu_count(common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count);
|
||||||
private:
|
private:
|
||||||
static int fill_backup_info_(
|
static int fill_backup_info_(
|
||||||
const obrpc::ObPhysicalRestoreTenantArg &arg,
|
const obrpc::ObPhysicalRestoreTenantArg &arg,
|
||||||
|
Reference in New Issue
Block a user