[CP][bugfix] control max threads allocates to a sqc
This commit is contained in:
@ -30687,11 +30687,11 @@ int ObDDLService::init_system_variables(
|
|||||||
HEAP_VAR(ObUnitConfig, unit_config) {
|
HEAP_VAR(ObUnitConfig, unit_config) {
|
||||||
if (OB_SYS_TENANT_ID == sys_variable_schema.get_tenant_id()) {
|
if (OB_SYS_TENANT_ID == sys_variable_schema.get_tenant_id()) {
|
||||||
// When creating a system tenant, the default value of px_thread_count is related to
|
// When creating a system tenant, the default value of px_thread_count is related to
|
||||||
// default sys tenant max cpu
|
// default sys tenant min cpu
|
||||||
const int64_t sys_default_max_cpu =
|
const int64_t sys_default_min_cpu =
|
||||||
static_cast<int64_t>(GCONF.get_sys_tenant_default_max_cpu());
|
static_cast<int64_t>(GCONF.get_sys_tenant_default_min_cpu());
|
||||||
default_px_thread_count = ObTenantCpuShare::calc_px_pool_share(
|
default_px_thread_count = ObTenantCpuShare::calc_px_pool_share(
|
||||||
sys_variable_schema.get_tenant_id(), sys_default_max_cpu);
|
sys_variable_schema.get_tenant_id(), sys_default_min_cpu);
|
||||||
} else if (OB_UNLIKELY(NULL == unit_mgr_)) {
|
} else if (OB_UNLIKELY(NULL == unit_mgr_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unit_mgr_ is null", K(ret), KP(unit_mgr_));
|
LOG_WARN("unit_mgr_ is null", K(ret), KP(unit_mgr_));
|
||||||
@ -30702,7 +30702,7 @@ int ObDDLService::init_system_variables(
|
|||||||
arg.pool_list_.at(0), unit_config))) {
|
arg.pool_list_.at(0), unit_config))) {
|
||||||
LOG_WARN("fail to get unit config", K(ret));
|
LOG_WARN("fail to get unit config", K(ret));
|
||||||
} else {
|
} else {
|
||||||
int64_t cpu_count = static_cast<int64_t>(unit_config.unit_resource().max_cpu());
|
int64_t cpu_count = static_cast<int64_t>(unit_config.unit_resource().min_cpu());
|
||||||
default_px_thread_count = ObTenantCpuShare::calc_px_pool_share(
|
default_px_thread_count = ObTenantCpuShare::calc_px_pool_share(
|
||||||
sys_variable_schema.get_tenant_id(), cpu_count);
|
sys_variable_schema.get_tenant_id(), cpu_count);
|
||||||
}
|
}
|
||||||
@ -30711,7 +30711,7 @@ int ObDDLService::init_system_variables(
|
|||||||
|
|
||||||
if (OB_SUCC(ret) && use_default_parallel_servers_target && default_px_thread_count > 0) {
|
if (OB_SUCC(ret) && use_default_parallel_servers_target && default_px_thread_count > 0) {
|
||||||
// target cannot be less than 3, otherwise any px query will not come in
|
// target cannot be less than 3, otherwise any px query will not come in
|
||||||
int64_t default_px_servers_target = std::max(3L, static_cast<int64_t>(default_px_thread_count * 0.8));
|
int64_t default_px_servers_target = std::max(3L, static_cast<int64_t>(default_px_thread_count));
|
||||||
VAR_INT_TO_STRING(val_buf, default_px_servers_target);
|
VAR_INT_TO_STRING(val_buf, default_px_servers_target);
|
||||||
SET_TENANT_VARIABLE(SYS_VAR_PARALLEL_SERVERS_TARGET, val_buf);
|
SET_TENANT_VARIABLE(SYS_VAR_PARALLEL_SERVERS_TARGET, val_buf);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -467,20 +467,18 @@ void ObVirtualTenantManager::ObTenantInfo::reset()
|
|||||||
is_loaded_ = false;
|
is_loaded_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ObTenantCpuShare::calc_px_pool_share(uint64_t tenant_id, int64_t cpu_count)
|
int64_t ObTenantCpuShare::calc_px_pool_share(uint64_t tenant_id, int64_t min_cpu)
|
||||||
{
|
{
|
||||||
UNUSED(tenant_id);
|
int64_t share = 3;
|
||||||
/* 按照 cpu_count * concurrency * 0.1 作为默认值
|
int ret = OB_SUCCESS;
|
||||||
* 但确保最少分配 3 个 线程给 px pool,
|
oceanbase::omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||||
* 计算出的默认值小于 3 时强制设置为 3
|
if (!tenant_config.is_valid()) {
|
||||||
*
|
share = 3;
|
||||||
* 为什么要保证至少为 3? 这是为了尽可能让 mysqltest
|
COMMON_LOG(ERROR, "fail get tenant config. share default to 3", K(share));
|
||||||
* 都能过。mysqltest 里遇到一般的右深树时,3 个线程
|
} else {
|
||||||
* 能够保证调度成功,2 则会超时。
|
share = std::max(3L, min_cpu * tenant_config->px_workers_per_cpu_quota);
|
||||||
*/
|
}
|
||||||
return std::max(3L,
|
return share;
|
||||||
cpu_count *
|
|
||||||
static_cast<int64_t>(static_cast<double>(GCONF.px_workers_per_cpu_quota.get()) * 0.1));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace common
|
} // namespace common
|
||||||
|
|||||||
@ -129,7 +129,7 @@ class ObTenantCpuShare
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/* Return value: The number of px threads assigned to tenant_id tenant */
|
/* Return value: The number of px threads assigned to tenant_id tenant */
|
||||||
static int64_t calc_px_pool_share(uint64_t tenant_id, int64_t cpu_count);
|
static int64_t calc_px_pool_share(uint64_t tenant_id, int64_t min_cpu);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -315,7 +315,7 @@ DEF_INT(global_write_halt_residual_memory, OB_CLUSTER_PARAMETER, "30", "(0, 100)
|
|||||||
"disable write to memstore when observer memstore free memory(plus memory hold by blockcache) lower than this limit, Range: (0, 100)"
|
"disable write to memstore when observer memstore free memory(plus memory hold by blockcache) lower than this limit, Range: (0, 100)"
|
||||||
"limit calc by (memory_limit - system_memory) * global_write_halt_residual_memory/100",
|
"limit calc by (memory_limit - system_memory) * global_write_halt_residual_memory/100",
|
||||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
DEF_INT(px_workers_per_cpu_quota, OB_CLUSTER_PARAMETER, "10", "[0,20]",
|
DEF_INT(px_workers_per_cpu_quota, OB_TENANT_PARAMETER, "10", "[0,20]",
|
||||||
"the ratio(integer) between the number of system allocated px workers vs "
|
"the ratio(integer) between the number of system allocated px workers vs "
|
||||||
"the maximum number of threads that can be scheduled concurrently. Range: [0, 20]",
|
"the maximum number of threads that can be scheduled concurrently. Range: [0, 20]",
|
||||||
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
|
|||||||
@ -209,7 +209,7 @@ void ObPxSubAdmission::acquire(int64_t max, int64_t min, int64_t &acquired_cnt)
|
|||||||
LOG_WARN_RET(OB_ERR_UNEXPECTED, "get tenant config failed, use default cpu_quota_concurrency");
|
LOG_WARN_RET(OB_ERR_UNEXPECTED, "get tenant config failed, use default cpu_quota_concurrency");
|
||||||
upper_bound = tenant->unit_min_cpu() * 4;
|
upper_bound = tenant->unit_min_cpu() * 4;
|
||||||
} else {
|
} else {
|
||||||
upper_bound = tenant->unit_min_cpu() * tenant_config->cpu_quota_concurrency;
|
upper_bound = tenant->unit_min_cpu() * tenant_config->px_workers_per_cpu_quota;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
acquired_cnt = std::min(max, upper_bound);
|
acquired_cnt = std::min(max, upper_bound);
|
||||||
|
|||||||
Reference in New Issue
Block a user