[FEAT MERGE][CP] [4.2.3 patch 4.3.2] add global CPU isolation

This commit is contained in:
zhjc1124 2024-06-17 11:32:57 +00:00 committed by ob-robot
parent 82d6756606
commit 9d01875f1e
20 changed files with 994 additions and 687 deletions

View File

@ -2410,6 +2410,7 @@ void ObMultiTenant::run1()
if (!OB_ISNULL(*it)) {
ObTaskController::get().allow_next_syslog();
LOG_INFO("dump tenant info", "tenant", **it);
(*it)->print_throttled_time();
}
}
}

View File

@ -604,6 +604,24 @@ int ObResourceGroup::clear_worker()
return ret;
}
int ObResourceGroup::get_throttled_time(int64_t &throttled_time)
{
int ret = OB_SUCCESS;
int64_t current_throttled_time_us = -1;
if (OB_ISNULL(GCTX.cgroup_ctrl_) || !GCTX.cgroup_ctrl_->is_valid()) {
// do nothing
} else if (OB_FAIL(GCTX.cgroup_ctrl_->get_throttled_time(tenant_->id(),
current_throttled_time_us,
group_id_,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN("get throttled time failed", K(ret), K(tenant_->id()), K(group_id_));
} else if (current_throttled_time_us > 0) {
throttled_time = current_throttled_time_us - throttled_time_us_;
throttled_time_us_ = current_throttled_time_us;
}
return ret;
}
int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group)
{
int ret = OB_SUCCESS;
@ -1127,8 +1145,9 @@ void ObTenant::destroy()
DESTROY_ENTITY(ctx_);
ctx_ = nullptr;
}
if (cgroup_ctrl_.is_valid()
&& OB_SUCCESS != (tmp_ret = cgroup_ctrl_.remove_tenant_cgroup(id_))) {
if (cgroup_ctrl_.is_valid() &&
OB_TMP_FAIL(cgroup_ctrl_.remove_both_cgroup(
id_, OB_INVALID_GROUP_ID, GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN_RET(tmp_ret, "remove tenant cgroup failed", K(tmp_ret), K_(id));
}
group_map_.destroy_group();
@ -1165,36 +1184,14 @@ void ObTenant::set_unit_max_cpu(double cpu)
{
int tmp_ret = OB_SUCCESS;
unit_max_cpu_ = cpu;
int32_t cfs_period_us = 0;
int32_t cfs_period_us_new = 0;
if (!cgroup_ctrl_.is_valid() || is_meta_tenant(id_)) {
// do nothing
} else if (is_sys_tenant(id_)) {
int32_t sys_cfs_quota_us = -1;
if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(sys_cfs_quota_us, id_))) {
LOG_WARN_RET(tmp_ret, "set sys tennat cpu cfs quota failed", K(tmp_ret), K_(id), K(sys_cfs_quota_us));
}
} else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
LOG_WARN_RET(tmp_ret, "fail get cpu cfs period", K_(id));
} else {
uint32_t loop_times = 0;
// to avoid kernel scaling cfs_period_us after get cpu_cfs_period,
// we should check whether cfs_period_us has been changed after set cpu_cfs_quota.
while (OB_SUCCESS == tmp_ret && cfs_period_us_new != cfs_period_us) {
cfs_period_us = cfs_period_us_new;
int32_t cfs_quota_us = static_cast<int32_t>(cfs_period_us * cpu);
if (OB_TMP_FAIL(cgroup_ctrl_.set_cpu_cfs_quota(cfs_quota_us, id_))) {
LOG_WARN_RET(tmp_ret, "set cpu cfs quota failed", K_(id), K(cfs_quota_us));
} else if (OB_TMP_FAIL(cgroup_ctrl_.get_cpu_cfs_period(cfs_period_us_new, id_, INT64_MAX))) {
LOG_ERROR_RET(tmp_ret, "fail get cpu cfs period", K_(id));
} else {
loop_times++;
if (loop_times > 3) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_ERROR_RET(tmp_ret, "cpu_cfs_period has been always changing, thread may be hung", K_(id), K(cfs_period_us), K(cfs_period_us_new), K(cfs_quota_us));
}
}
}
} else if (OB_TMP_FAIL(cgroup_ctrl_.set_both_cpu_cfs_quota(
id_,
is_sys_tenant(id_) ? -1 : cpu,
OB_INVALID_GROUP_ID,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN_RET(tmp_ret, "set tenant cpu cfs quota failed", K(tmp_ret), K_(id));
}
}
@ -1202,11 +1199,13 @@ void ObTenant::set_unit_min_cpu(double cpu)
{
int tmp_ret = OB_SUCCESS;
unit_min_cpu_ = cpu;
const double default_cpu_shares = 1024.0;
int32_t cpu_shares = static_cast<int32_t>(default_cpu_shares * cpu);
if (cgroup_ctrl_.is_valid()
&& OB_SUCCESS != (tmp_ret = cgroup_ctrl_.set_cpu_shares(cpu_shares, id_))) {
LOG_WARN_RET(tmp_ret, "set cpu shares failed", K(tmp_ret), K_(id), K(cpu_shares));
if (cgroup_ctrl_.is_valid() &&
OB_TMP_FAIL(cgroup_ctrl_.set_both_cpu_shares(
id_,
cpu,
OB_INVALID_GROUP_ID,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN_RET(tmp_ret, "set tenant cpu shares failed", K(tmp_ret), K_(id), K(cpu));
}
}
@ -1606,6 +1605,73 @@ int ObTenant::timeup()
return OB_SUCCESS;
}
void ObTenant::print_throttled_time()
{
class ThrottledTimeLog
{
public:
ThrottledTimeLog(ObTenant *tenant) : tenant_(tenant)
{}
~ThrottledTimeLog()
{}
int64_t to_string(char *buf, const int64_t len) const
{
int64_t pos = 0;
int tmp_ret = OB_SUCCESS;
int64_t tenant_throttled_time = 0;
int64_t group_throttled_time = 0;
ObResourceGroupNode *iter = NULL;
ObResourceGroup *group = nullptr;
ObCgSet &set = ObCgSet::instance();
while (NULL != (iter = tenant_->group_map_.quick_next(iter))) {
group = static_cast<ObResourceGroup *>(iter);
if (!is_user_group(group->group_id_)) {
if (OB_TMP_FAIL(group->get_throttled_time(group_throttled_time))) {
LOG_WARN_RET(tmp_ret, "get throttled time failed", K(tmp_ret), K(group));
} else {
tenant_throttled_time += group_throttled_time;
databuff_printf(buf, len, pos, "group_id: %d, group: %s, throttled_time: %ld;", group->group_id_, set.name_of_id(group->group_id_), group_throttled_time);
}
}
}
share::ObGroupName g_name;
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_TMP_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_->id_, tenant_holder))) {
LOG_WARN_RET(tmp_ret, "get tenant io manager failed", K(tmp_ret), K(tenant_->id_));
} else {
for (int64_t i = 0; i < tenant_holder.get_ptr()->get_group_num(); i++) {
if (!tenant_holder.get_ptr()->get_io_config().group_configs_.at(i).deleted_ &&
!tenant_holder.get_ptr()->get_io_config().group_configs_.at(i).cleared_) {
uint64_t group_id = tenant_holder.get_ptr()->get_io_config().group_ids_.at(i);
if (OB_TMP_FAIL(tenant_holder.get_ptr()->get_throttled_time(group_id, group_throttled_time))) {
LOG_WARN_RET(tmp_ret, "get throttled time failed", K(tmp_ret), K(group_id));
} else if (OB_TMP_FAIL(tenant_->cgroup_ctrl_.get_group_info_by_group_id(tenant_->id_, group_id, g_name))) {
LOG_WARN_RET(tmp_ret, "get group_name by id failed", K(tmp_ret), K(group_id));
} else {
tenant_throttled_time += group_throttled_time;
databuff_printf(buf,
len,
pos,
"group_id: %ld, group: %.*s, throttled_time: %ld;",
group_id,
g_name.get_value().length(),
g_name.get_value().ptr(),
group_throttled_time);
}
}
}
}
databuff_printf(
buf, len, pos, "tenant_id: %lu, tenant_throttled_time: %ld;", tenant_->id_, tenant_throttled_time);
return pos;
}
ObTenant *tenant_;
};
ThrottledTimeLog throttled_time_log(this);
LOG_INFO("dump throttled time info", K(id_), K(throttled_time_log));
}
void ObTenant::handle_retry_req(bool need_clear)
{
int ret = OB_SUCCESS;

View File

@ -291,6 +291,7 @@ public:
void check_worker_count();
void check_worker_count(ObThWorker &w);
int clear_worker();
int get_throttled_time(int64_t &throttled_time);
TO_STRING_KV("group_id", group_id_,
"queue_size", req_queue_.size(),
"recv_req_cnt", recv_req_cnt_,
@ -316,6 +317,7 @@ private:
int nesting_worker_cnt_;
ObTenant *tenant_;
share::ObCgroupCtrl *cgroup_ctrl_;
int64_t throttled_time_us_;
};
typedef common::FixedHash2<ObResourceGroupNode> GroupHash;
@ -441,6 +443,7 @@ public:
void update_queue_size();
int timeup();
void print_throttled_time();
TO_STRING_KV(K_(id),
K_(tenant_meta),

View File

@ -76,6 +76,7 @@ const int64_t USER_RESOURCE_GROUP_START_ID = 10000;
const int64_t SYS_RESOURCE_GROUP_CNT = SYS_RESOURCE_GROUP_END_ID - SYS_RESOURCE_GROUP_START_ID;
const uint64_t USER_RESOURCE_OTHER_GROUP_ID = 0;
const uint64_t OB_INVALID_GROUP_ID = UINT64_MAX;
static constexpr char BACKGROUND_CGROUP[] = "background";
OB_INLINE bool is_valid_group(const uint64_t group_id)
{

View File

@ -19,6 +19,7 @@
#include "share/rc/ob_tenant_base.h"
#include "logservice/leader_coordinator/ob_failure_detector.h"
#include "share/errsim_module/ob_errsim_module_interface_imp.h"
#include "share/resource_manager/ob_cgroup_ctrl.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
@ -1400,6 +1401,7 @@ int ObTenantIOManager::delete_consumer_group_config(const int64_t group_id)
} else if (OB_STATE_NOT_MATCH == ret) {
// group delete twice, maybe deleted by delete_directive or delete_plan
LOG_INFO("group delete twice", K(ret), K(index), K(group_id));
ret = OB_SUCCESS;
} else {
LOG_WARN("get index from map failed", K(ret), K(group_id), K(index));
}
@ -1489,100 +1491,179 @@ uint64_t ObTenantIOManager::get_usage_index(const int64_t group_id)
}
return index;
}
void ObTenantIOManager::print_io_status()
{
if (is_working() && is_inited_) {
char io_status[1024] = { 0 };
bool need_print_io_config = false;
ObIOUsage::AvgItems avg_iops, avg_size, avg_rt;
class IOStatusLog
{
public:
IOStatusLog(ObTenantIOManager *io_manager, ObIOUsage::AvgItems *avg_size, ObIOUsage::AvgItems *avg_iops,
ObIOUsage::AvgItems *avg_rt)
: io_manager_(io_manager), avg_size_(avg_size), avg_iops_(avg_iops), avg_rt_(avg_rt)
{}
~IOStatusLog()
{}
int64_t to_string(char *buf, const int64_t len) const
{
int64_t pos = 0;
for (int64_t i = 1; i < io_manager_->io_usage_.get_io_usage_num() && i < avg_size_->count() &&
i < avg_iops_->count() && i < avg_rt_->count();
++i) {
if (!io_manager_->io_config_.group_configs_.at(i - 1).deleted_) {
if (avg_size_->at(i).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"group_id: %ld, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
io_manager_->io_config_.group_ids_.at(i - 1),
avg_size_->at(i).at(static_cast<int>(ObIOMode::READ)),
avg_iops_->at(i).at(static_cast<int>(ObIOMode::READ)),
avg_rt_->at(i).at(static_cast<int>(ObIOMode::READ)));
}
if (avg_size_->at(i).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"group_id: %ld, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
io_manager_->io_config_.group_ids_.at(i - 1),
avg_size_->at(i).at(static_cast<int>(ObIOMode::WRITE)),
avg_iops_->at(i).at(static_cast<int>(ObIOMode::WRITE)),
avg_rt_->at(i).at(static_cast<int>(ObIOMode::WRITE)));
}
}
}
// OTHER_GROUPS
if (avg_size_->at(0).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"group_id: %ld, group_name: %s, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
0L,
"OTHER_GROUPS",
avg_size_->at(0).at(static_cast<int>(ObIOMode::READ)),
avg_iops_->at(0).at(static_cast<int>(ObIOMode::READ)),
avg_rt_->at(0).at(static_cast<int>(ObIOMode::READ)));
}
if (avg_size_->at(0).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"group_id: %ld, group_name: %s, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
0L,
"OTHER_GROUPS",
avg_size_->at(0).at(static_cast<int>(ObIOMode::WRITE)),
avg_iops_->at(0).at(static_cast<int>(ObIOMode::WRITE)),
avg_rt_->at(0).at(static_cast<int>(ObIOMode::WRITE)));
}
return pos;
}
ObTenantIOManager *io_manager_;
ObIOUsage::AvgItems *avg_size_;
ObIOUsage::AvgItems *avg_iops_;
ObIOUsage::AvgItems *avg_rt_;
};
ObIOUsage::AvgItems avg_size;
ObIOUsage::AvgItems avg_iops;
ObIOUsage::AvgItems avg_rt;
io_usage_.calculate_io_usage();
io_backup_usage_.calculate_io_usage();
ObSysIOUsage::SysAvgItems sys_avg_iops, sys_avg_size, sys_avg_rt;
io_usage_.get_io_usage(avg_iops, avg_size, avg_rt);
io_backup_usage_.get_io_usage(sys_avg_iops, sys_avg_size, sys_avg_rt);
for (int64_t i = 1; i < io_usage_.get_io_usage_num() && i < avg_size.count() && i < avg_iops.count() && i < avg_rt.count(); ++i) {
if (io_config_.group_configs_.at(i-1).deleted_) {
continue;
}
if (avg_size.at(i).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "group_id: %ld, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f",
io_config_.group_ids_.at(i-1),
avg_size.at(i).at(static_cast<int>(ObIOMode::READ)),
avg_iops.at(i).at(static_cast<int>(ObIOMode::READ)),
avg_rt.at(i).at(static_cast<int>(ObIOMode::READ)));
LOG_INFO("[IO STATUS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
}
if (avg_size.at(i).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "group_id: %ld, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f",
io_config_.group_ids_.at(i-1),
avg_size.at(i).at(static_cast<int>(ObIOMode::WRITE)),
avg_iops.at(i).at(static_cast<int>(ObIOMode::WRITE)),
avg_rt.at(i).at(static_cast<int>(ObIOMode::WRITE)));
LOG_INFO("[IO STATUS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
IOStatusLog io_status_log(this, &avg_iops, &avg_size, &avg_rt);
bool need_print_io_status = false;
for (int64_t i = 0; !need_print_io_status && i < io_usage_.get_io_usage_num() && i < avg_size.count(); ++i) {
if (i == 0 || (i > 0 && !io_config_.group_configs_.at(i - 1).deleted_)) {
if (avg_size.at(i).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon() ||
avg_size.at(i).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
need_print_io_status = true;
}
}
}
// OTHER_GROUPS
if (avg_size.at(0).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "group_id: %ld, group_name: %s, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f",
0L,
"OTHER_GROUPS",
avg_size.at(0).at(static_cast<int>(ObIOMode::READ)),
avg_iops.at(0).at(static_cast<int>(ObIOMode::READ)),
avg_rt.at(0).at(static_cast<int>(ObIOMode::READ)));
LOG_INFO("[IO STATUS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
}
if (avg_size.at(0).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "group_id: %ld, group_name: %s, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f",
0L,
"OTHER_GROUPS",
avg_size.at(0).at(static_cast<int>(ObIOMode::WRITE)),
avg_iops.at(0).at(static_cast<int>(ObIOMode::WRITE)),
avg_rt.at(0).at(static_cast<int>(ObIOMode::WRITE)));
LOG_INFO("[IO STATUS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
if (need_print_io_status) {
LOG_INFO("[IO STATUS]", K_(tenant_id), K(io_status_log));
}
// MOCK SYS GROUPS
for (int64_t j = 0; j < sys_avg_size.count(); ++j) {
if (j >= sys_avg_size.count() || j >= sys_avg_iops.count() || j >= sys_avg_rt.count()) {
//ignore
} else {
ObIOModule module = static_cast<ObIOModule>(SYS_RESOURCE_GROUP_START_ID + j);
if (sys_avg_size.at(j).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "sys_group_name: %s, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f",
get_io_sys_group_name(module),
sys_avg_size.at(j).at(static_cast<int>(ObIOMode::READ)),
sys_avg_iops.at(j).at(static_cast<int>(ObIOMode::READ)),
sys_avg_rt.at(j).at(static_cast<int>(ObIOMode::READ)));
LOG_INFO("[IO STATUS SYS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
}
if (sys_avg_size.at(j).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
snprintf(io_status, sizeof(io_status), "sys_group_name: %s, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f",
get_io_sys_group_name(module),
sys_avg_size.at(j).at(static_cast<int>(ObIOMode::WRITE)),
sys_avg_iops.at(j).at(static_cast<int>(ObIOMode::WRITE)),
sys_avg_rt.at(j).at(static_cast<int>(ObIOMode::WRITE)));
LOG_INFO("[IO STATUS SYS]", K_(tenant_id), KCSTRING(io_status));
need_print_io_config = true;
class IOStatusSysLog
{
public:
IOStatusSysLog(ObTenantIOManager *io_manager, ObSysIOUsage::SysAvgItems *sys_avg_iops,
ObSysIOUsage::SysAvgItems *sys_avg_size, ObSysIOUsage::SysAvgItems *sys_avg_rt)
: io_manager_(io_manager), sys_avg_iops_(sys_avg_iops), sys_avg_size_(sys_avg_size), sys_avg_rt_(sys_avg_rt)
{}
~IOStatusSysLog()
{}
int64_t to_string(char *buf, const int64_t len) const
{
int64_t pos = 0;
for (int64_t j = 0; j < sys_avg_size_->count(); ++j) {
if (j >= sys_avg_size_->count() || j >= sys_avg_iops_->count() || j >= sys_avg_rt_->count()) {
// ignore
} else {
ObIOModule module = static_cast<ObIOModule>(SYS_RESOURCE_GROUP_START_ID + j);
if (sys_avg_size_->at(j).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"sys_group_name: %s, mode: read, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
get_io_sys_group_name(module),
sys_avg_size_->at(j).at(static_cast<int>(ObIOMode::READ)),
sys_avg_iops_->at(j).at(static_cast<int>(ObIOMode::READ)),
sys_avg_rt_->at(j).at(static_cast<int>(ObIOMode::READ)));
}
if (sys_avg_size_->at(j).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
databuff_printf(buf,
len,
pos,
"sys_group_name: %s, mode: write, size: %10.2f, iops: %8.2f, rt: %8.2f; ",
get_io_sys_group_name(module),
sys_avg_size_->at(j).at(static_cast<int>(ObIOMode::WRITE)),
sys_avg_iops_->at(j).at(static_cast<int>(ObIOMode::WRITE)),
sys_avg_rt_->at(j).at(static_cast<int>(ObIOMode::WRITE)));
}
}
}
return pos;
}
ObTenantIOManager *io_manager_;
ObSysIOUsage::SysAvgItems *sys_avg_iops_;
ObSysIOUsage::SysAvgItems *sys_avg_size_;
ObSysIOUsage::SysAvgItems *sys_avg_rt_;
};
ObSysIOUsage::SysAvgItems sys_avg_iops, sys_avg_size, sys_avg_rt;
io_backup_usage_.calculate_io_usage();
io_backup_usage_.get_io_usage(sys_avg_iops, sys_avg_size, sys_avg_rt);
IOStatusSysLog io_status_sys_log(this, &sys_avg_iops, &sys_avg_size, &sys_avg_rt);
bool need_print_sys_io_status = false;
for (int64_t j = 0; !need_print_sys_io_status && j < sys_avg_size.count(); ++j) {
if (sys_avg_size.at(j).at(static_cast<int>(ObIOMode::READ)) > std::numeric_limits<double>::epsilon() ||
sys_avg_size.at(j).at(static_cast<int>(ObIOMode::WRITE)) > std::numeric_limits<double>::epsilon()) {
need_print_sys_io_status = true;
}
}
if (need_print_sys_io_status) {
LOG_INFO("[IO STATUS SYS]", K_(tenant_id), K(io_status_sys_log));
}
if (need_print_io_config) {
if (need_print_io_status || need_print_sys_io_status) {
ObArray<int64_t> queue_count_array;
int ret = OB_SUCCESS;
if (OB_FAIL(callback_mgr_.get_queue_count(queue_count_array))) {
LOG_WARN("get callback queue count failed", K(ret));
}
LOG_INFO("[IO STATUS CONFIG]", K_(tenant_id), K_(ref_cnt), K_(io_config),
"allocated_memory", io_allocator_.get_allocated_size(),
"free_request_count", io_request_pool_.get_free_cnt(),
"free_result_count", io_result_pool_.get_free_cnt(),
"callback_queues", queue_count_array);
LOG_INFO("[IO STATUS CONFIG]",
K_(tenant_id),
K_(ref_cnt),
K_(io_config),
"allocated_memory",
io_allocator_.get_allocated_size(),
"free_request_count",
io_request_pool_.get_free_cnt(),
"free_result_count",
io_result_pool_.get_free_cnt(),
"callback_queues",
queue_count_array);
}
if (ATOMIC_LOAD(&io_config_.enable_io_tracer_)) {
io_tracer_.print_status();
@ -1605,3 +1686,26 @@ void ObTenantIOManager::dec_ref()
abort();
}
}
int ObTenantIOManager::get_throttled_time(uint64_t group_id, int64_t &throttled_time)
{
int ret = OB_SUCCESS;
int64_t current_throttled_time_us = -1;
if (OB_ISNULL(GCTX.cgroup_ctrl_) || !GCTX.cgroup_ctrl_->is_valid()) {
// do nothing
} else if (OB_FAIL(GCTX.cgroup_ctrl_->get_throttled_time(tenant_id_,
current_throttled_time_us,
group_id,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN("get throttled time failed", K(ret), K(tenant_id_), K(group_id));
} else if (current_throttled_time_us > 0) {
uint64_t idx = 0;
if (OB_FAIL(get_group_index(group_id, idx))) {
LOG_WARN("get group index failed", K(ret), K(group_id));
} else {
throttled_time = current_throttled_time_us - io_usage_.group_throttled_time_us_.at(idx);
io_usage_.group_throttled_time_us_.at(idx) = current_throttled_time_us;
}
}
return ret;
}

View File

@ -168,6 +168,7 @@ public:
void print_io_status();
void inc_ref();
void dec_ref();
int get_throttled_time(uint64_t group_id, int64_t &throttled_time);
TO_STRING_KV(K(is_inited_), K(tenant_id_), K(ref_cnt_), K(io_memory_limit_), K(request_count_), K(result_count_),
K(io_config_), K(io_clock_), K(io_allocator_), KPC(io_scheduler_), K(callback_mgr_), K(io_memory_limit_),
K(request_count_), K(result_count_), K(io_request_pool_), K(io_result_pool_));

View File

@ -427,7 +427,8 @@ void ObIOStatDiff::reset()
/****************** IOUsage **********************/
ObIOUsage::ObIOUsage()
: io_stats_(),
: group_throttled_time_us_(),
io_stats_(),
io_estimators_(),
group_avg_iops_(),
group_avg_byte_(),
@ -454,6 +455,7 @@ int ObIOUsage::init(const int64_t group_num)
group_avg_iops_.count() != group_num_ ||
group_avg_byte_.count() != group_num_ ||
group_avg_rt_us_.count() != group_num_ ||
group_throttled_time_us_.count() != group_num_ ||
doing_request_count_.count() != group_num_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("init io usage failed", K(group_num_));
@ -472,6 +474,7 @@ int ObIOUsage::refresh_group_num(const int64_t group_num)
OB_FAIL(group_avg_iops_.reserve(group_num + 1)) ||
OB_FAIL(group_avg_byte_.reserve(group_num + 1)) ||
OB_FAIL(group_avg_rt_us_.reserve(group_num + 1)) ||
OB_FAIL(group_throttled_time_us_.reserve(group_num + 1)) ||
OB_FAIL(doing_request_count_.reserve(group_num + 1))) {
LOG_WARN("reserver group failed", K(ret), K(group_num));
} else {
@ -481,6 +484,7 @@ int ObIOUsage::refresh_group_num(const int64_t group_num)
ObSEArray<double, GROUP_START_NUM> cur_avg_iops;
ObSEArray<double, GROUP_START_NUM> cur_avg_byte;
ObSEArray<double, GROUP_START_NUM> cur_avg_rt_us;
ObSEArray<int64_t, GROUP_START_NUM> cur_throttled_time_us;
if (OB_FAIL(cur_stat_array.reserve(static_cast<int>(ObIOMode::MAX_MODE))) ||
OB_FAIL(cur_estimators_array.reserve(static_cast<int>(ObIOMode::MAX_MODE))) ||
@ -516,6 +520,8 @@ int ObIOUsage::refresh_group_num(const int64_t group_num)
LOG_WARN("push avg_byte array failed", K(ret), K(i));
} else if (OB_FAIL(group_avg_rt_us_.push_back(cur_avg_rt_us))) {
LOG_WARN("push avg_rt array failed", K(ret), K(i));
} else if (OB_FAIL(group_throttled_time_us_.push_back(0))) {
LOG_WARN("push throttled_time_us array failed", K(ret), K(i));
} else if (OB_FAIL(doing_request_count_.push_back(0))) {
LOG_WARN("push group_doing_req failed", K(ret), K(i));
} else {
@ -838,27 +844,38 @@ void ObIOTuner::run1()
}
}
void ObIOTuner::print_sender_status()
int64_t ObIOTuner::to_string(char *buf, const int64_t len) const
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < io_scheduler_.senders_.count(); ++i) {
int64_t pos = 0;
int tmp_ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCCESS == tmp_ret && i < io_scheduler_.senders_.count(); ++i) {
ObIOSender *sender = io_scheduler_.senders_.at(i);
if (OB_ISNULL(sender)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("io sender is null", K(ret), K(i));
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "io sender is null", K(i));
} else {
int64_t reservation_ts = 0;
int64_t group_limitation_ts = 0;
int64_t tenant_limitation_ts = 0;
int64_t proportion_ts = 0;
ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts);
if (OB_NOT_INIT != ret) {
LOG_INFO("[IO STATUS SENDER]", "send_index", sender->sender_index_, "req_count", sender->get_queue_count(),
K(reservation_ts), K(group_limitation_ts), K(tenant_limitation_ts), K(proportion_ts));
tmp_ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts);
if (OB_NOT_INIT != tmp_ret) {
databuff_printf(buf,
len,
pos,
"send_index: %ld, req_count: %ld, reservation_ts: %ld, group_limitation_ts: %ld, tenant_limitation_ts: "
"%ld, proportion_ts: %ld; ",
sender->sender_index_,
sender->get_queue_count(),
reservation_ts,
group_limitation_ts,
tenant_limitation_ts,
proportion_ts);
}
}
}
return pos;
}
int ObIOTuner::try_release_thread()
@ -884,6 +901,11 @@ int ObIOTuner::try_release_thread()
return ret;
}
void ObIOTuner::print_sender_status()
{
LOG_INFO("[IO STATUS SENDER]", K(*this));
}
void ObIOTuner::print_io_status()
{
int ret = OB_SUCCESS;
@ -3569,7 +3591,7 @@ int ObIOTracer::trace_request(const ObIORequest *req, const char *msg, const Tra
return ret;
}
void ObIOTracer::print_status()
int64_t ObIOTracer::to_string(char *buf, const int64_t len) const
{
struct UpdateFn {
int operator () (hash::HashMapPair<TraceInfo, int64_t> &entry) {
@ -3626,6 +3648,7 @@ void ObIOTracer::print_status()
}
} sort_fn;
int64_t pos = 0;
int ret = OB_SUCCESS;
CountFn counter;
if (OB_FAIL(counter.init())) {
@ -3641,12 +3664,17 @@ void ObIOTracer::print_status()
LOG_WARN("get max backtrace count failed", K(ret));
} else {
std::sort(trace_array.begin(), trace_array.end(), sort_fn);
LOG_INFO("[IO STATUS TRACER]", K_(tenant_id), "trace_request_count", counter.req_count_, "distinct_backtrace_count", trace_array.count());
databuff_printf(buf, len, pos, "trace_request_count: %ld, distinct_backtrace_count: %ld; ", counter.req_count_, trace_array.count());
const int64_t print_count = min(5, trace_array.count());
for (int64_t i = 0; OB_SUCC(ret) && i < print_count; ++i) {
const TraceItem &item = trace_array.at(i);
LOG_INFO("[IO STATUS TRACER]", "top", i + 1, "count", item.count_, "ref_log", item.trace_info_.ref_log_, "backtrace", item.trace_info_.bt_str_);
databuff_printf(buf, len, pos, "top: %ld, count: %ld, ref_log: %s, backtrace: %s; ", i + 1, item.count_, to_cstring(item.trace_info_.ref_log_), item.trace_info_.bt_str_);
}
}
}
return pos;
}
void ObIOTracer::print_status()
{
LOG_INFO("[IO STATUS TRACER]", K_(tenant_id), K(*this));
}

View File

@ -152,6 +152,7 @@ public:
void record_request_finish(ObIOResult &result);
bool is_request_doing(const int64_t index) const;
int64_t get_io_usage_num() const;
ObSEArray<int64_t, GROUP_START_NUM> group_throttled_time_us_;
int64_t to_string(char* buf, const int64_t buf_len) const;
private:
ObSEArray<ObSEArray<ObIOStat, GROUP_START_NUM>, 2> io_stats_;
@ -204,7 +205,7 @@ public:
void destroy();
int send_detect_task();
virtual void run1() override;
int64_t to_string(char *buf, const int64_t len) const;
private:
void print_sender_status();
int try_release_thread();
@ -570,6 +571,7 @@ public:
void reuse();
int trace_request(const ObIORequest *req, const char *msg, const TraceType trace_type);
void print_status();
int64_t to_string(char *buf, const int64_t len) const;
private:
bool is_inited_;
uint64_t tenant_id_;

View File

@ -1623,6 +1623,13 @@ ERRSIM_DEF_STR(errsim_migration_src_server_addr, OB_CLUSTER_PARAMETER, "",
DEF_BOOL(enable_cgroup, OB_CLUSTER_PARAMETER, "True",
"when set to false, cgroup will not init; when set to true but cgroup root dir is not ready, print ERROR",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_BOOL(enable_global_background_resource_isolation, OB_CLUSTER_PARAMETER, "False",
"When set to false, foreground and background tasks are isolated within the tenant; When set to true, isolate background tasks individually upon tenant-level",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_DBL(global_background_cpu_quota, OB_CLUSTER_PARAMETER, "-1", "[-1,)",
"When enable_global_background_resource_isolation is True, specify the number of vCPUs allocated to the background tasks"
"-1 for the CPU is not limited by the cgroup",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_user_defined_rewrite_rules, OB_TENANT_PARAMETER, "False",
"specify whether the user defined rewrite rules are enabled. "
"Value: True: enable False: disable",

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@
#include <stdint.h>
#include <sys/types.h>
#include "share/io/ob_io_define.h"
namespace oceanbase
{
namespace common
@ -144,66 +145,59 @@ public:
bool is_valid() { return valid_; }
bool is_valid_group_name(common::ObString &group_name);
static int compare_cpu(const double cpu1, const double cpu2, int &compare_ret);
// 删除租户cgroup规则
int remove_tenant_cgroup(const uint64_t tenant_id);
int remove_cgroup(const uint64_t tenant_id, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
int remove_both_cgroup(const uint64_t tenant_id, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
static int remove_dir_(const char *curr_dir);
int add_self_to_cgroup(const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int add_self_to_cgroup(const uint64_t tenant_id, uint64_t group_id = OBCG_DEFAULT, const char *base_path = "");
// 从指定租户cgroup组移除指定tid
int remove_self_from_cgroup(const uint64_t tenant_id);
// 后台线程绑定接口
int add_self_to_group(const uint64_t tenant_id,
const uint64_t group_id);
static int get_cgroup_config_(const char *group_path, const char *config_name, char *config_value);
static int set_cgroup_config_(const char *group_path, const char *config_name, char *config_value);
// 设定指定租户cgroup组的cpu.shares
int set_cpu_shares(const int32_t cpu_shares, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int get_cpu_shares(int32_t &cpu_shares, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int set_cpu_shares(const uint64_t tenant_id,
const int level,
const common::ObString &group,
const int32_t cpu_shares);
int set_cpu_shares(const uint64_t tenant_id, const double cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
int set_both_cpu_shares(const uint64_t tenant_id, const double cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
int get_cpu_shares(const uint64_t tenant_id, double &cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
// 设定指定租户cgroup组的cpu.cfs_quota_us
int set_cpu_cfs_quota(const int32_t cfs_quota_us, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int get_cpu_cfs_quota(int32_t &cfs_quota_us, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int set_cpu_cfs_quota(const uint64_t tenant_id,
const int level,
const common::ObString &group,
const int32_t cfs_quota_us);
// 获取某个租户的group 的 period 值,用于计算 cfs_quota_us
int get_cpu_cfs_period(const uint64_t tenant_id,
const int level,
const common::ObString &group,
int32_t &cfs_period_us);
int get_cpu_cfs_period(int32_t &cfs_period_us,const uint64_t tenant_id, int64_t group_id);
int set_cpu_cfs_quota(const uint64_t tenant_id, const double cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
static int set_cpu_cfs_quota_by_path_(const char *group_path, const double cpu);
static int get_cpu_cfs_quota_by_path_(const char *group_path, double &cpu);
static int dec_cpu_cfs_quota_(const char *curr_path, const double cpu);
int recursion_dec_cpu_cfs_quota_(const char *curr_path, const double cpu);
int set_both_cpu_cfs_quota(const uint64_t tenant_id, const double cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
int get_cpu_cfs_quota(const uint64_t tenant_id, double &cpu, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
// 获取某个cgroup组的cpuacct.usage, 即cpu time
int get_cpu_time(const uint64_t tenant_id, int64_t &cpu_time);
// 获取某段时间内cpu占用率
int get_cpu_usage(const uint64_t tenant_id, int32_t &cpu_usage);
int get_cpu_time(const uint64_t tenant_id, int64_t &cpu_time, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
int get_throttled_time(const uint64_t tenant_id, int64_t &throttled_time, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = "");
// 设定指定租户cgroup组的iops,直接更新到租户io_config
int set_group_iops(const uint64_t tenant_id,
const int level, // UNUSED
const int64_t group_id,
const uint64_t group_id,
const OBGroupIOInfo &group_io);
// 删除正在使用的plan反应到IO层:重置所有IOPS
int reset_all_group_iops(const uint64_t tenant_id,
const int level);// UNUSED
int reset_all_group_iops(const uint64_t tenant_id);
// 删除directive反应到IO层:重置IOPS
int reset_group_iops(const uint64_t tenant_id,
const int level, // UNUSED
const common::ObString &consumer_group);
// 删除group反应到IO层:停用对应的group结构
int delete_group_iops(const uint64_t tenant_id,
const int level, // UNUSED
const common::ObString &consumer_group);
int get_group_info_by_group_id(const uint64_t tenant_id,
uint64_t group_id,
share::ObGroupName &group_name);
class DirProcessor
{
public:
DirProcessor() = default;
~DirProcessor() = default;
virtual int handle_dir(const char *group_path) = 0;
};
// 根据 consumer group 动态创建 cgroup
int create_user_tenant_group_dir(
const uint64_t tenant_id,
int level,
const common::ObString &group);
private:
const char *root_cgroup_ = "cgroup";
const char *other_cgroup_ = "cgroup/other";
@ -211,7 +205,7 @@ private:
static const int32_t DEFAULT_SYS_SHARE = 1024;
static const int32_t DEFAULT_USER_SHARE = 4096;
static const int32_t PATH_BUFSIZE = 512;
static const int32_t VALUE_BUFSIZE = 32;
static const int32_t VALUE_BUFSIZE = 64;
static const int32_t GROUP_NAME_BUFSIZE = 129;
// 使用 ObCgroupCtrl 之前需要判断 group_ctrl 对象是否 valid,若为 false 则跳过 cgroup 机制
// 为 false 可能的原因是 cgroup 目录没有操作权限、操作系统不支持 cgroup 等。
@ -221,46 +215,20 @@ private:
private:
int init_cgroup_root_dir_(const char *cgroup_path);
int init_cgroup_dir_(const char *cgroup_path);
int write_string_to_file_(const char *filename, const char *content);
int get_string_from_file_(const char *filename, char content[VALUE_BUFSIZE]);
int get_task_path(
char *task_path,
int path_bufsize,
const uint64_t tenant_id,
int level,
const char *group);
int get_task_path(
char *task_path,
int path_bufsize,
const uint64_t tenant_id,
int level,
const common::ObString &group);
static int init_dir_(const char *curr_dir);
static int init_full_dir_(const char *curr_path);
static int write_string_to_file_(const char *filename, const char *content);
static int get_string_from_file_(const char *filename, char content[VALUE_BUFSIZE]);
int get_group_path(
char *group_path,
int path_bufsize,
const uint64_t tenant_id,
int64_t group_id = INT64_MAX);
int get_group_path(
char *group_path,
int path_bufsize,
const uint64_t tenant_id,
int level,
const char *group);
int get_group_path(
char *group_path,
int path_bufsize,
const uint64_t tenant_id,
int level,
const common::ObString &group);
int set_cpu_shares(const char *cgroup_path, const int32_t cpu_shares);
int get_group_info_by_group_id(const uint64_t tenant_id,
int64_t group_id,
share::ObGroupName &group_name);
uint64_t group_id = OB_INVALID_GROUP_ID,
const char *base_path = "");
enum { NOT_DIR = 0, LEAF_DIR, REGULAR_DIR };
int which_type_dir_(const char *curr_path, int &result);
int remove_dir_(const char *curr_dir);
int recursion_remove_group_(const char *curr_path);
int recursion_process_group_(const char *curr_path, DirProcessor *processor_ptr);
};
} // share

View File

@ -183,9 +183,7 @@ int ObResourceManagerProxy::delete_plan(
//删除非当前使用plan,do nothing
} else {
//删除当前使用的plan,把当前所有IO资源置空
if (OB_FAIL(GCTX.cgroup_ctrl_->reset_all_group_iops(
tenant_id,
1))) {
if (OB_FAIL(GCTX.cgroup_ctrl_->reset_all_group_iops(tenant_id))) {
LOG_WARN("reset cur plan group directive failed",K(plan), K(ret));
} else if (OB_FAIL(reset_all_mapping_rules())) {
LOG_WARN("reset hashmap failed when delete using plan");
@ -211,8 +209,9 @@ int ObResourceManagerProxy::allocate_consumer_group_id(
ObSqlString sql;
const char *tname = OB_ALL_RES_MGR_CONSUMER_GROUP_TNAME;
if (OB_FAIL(sql.assign_fmt(
"SELECT /* ALLOC_MAX_GROUP_ID */ COALESCE(MAX(CONSUMER_GROUP_ID) + 1, 10000) AS NEXT_GROUP_ID FROM %s "
"SELECT /* ALLOC_MAX_GROUP_ID */ COALESCE(MAX(CONSUMER_GROUP_ID) + 1, %lu) AS NEXT_GROUP_ID FROM %s "
"WHERE TENANT_ID = %ld",
USER_RESOURCE_GROUP_START_ID,
tname, ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) {
LOG_WARN("fail format sql", K(ret));
} else if (OB_FAIL(sql_client_retry_weak.read(res, tenant_id, sql.ptr()))) {
@ -352,8 +351,26 @@ int ObResourceManagerProxy::delete_consumer_group(
if (OB_SUCC(ret)) {
// 在这里inner sql之后就stop io_control的原因是,无法从内部表读到被删除group的信息
if (OB_FAIL(GCTX.cgroup_ctrl_->delete_group_iops(tenant_id, 1, consumer_group))) {
uint64_t group_id = 0;
share::ObGroupName group_name;
group_name.set_value(consumer_group);
ObResourceMappingRuleManager &rule_mgr = G_RES_MGR.get_mapping_rule_mgr();
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_CONFIG;
LOG_WARN("invalid config", K(ret), K(tenant_id));
} else if (OB_FAIL(rule_mgr.get_group_id_by_name(tenant_id, group_name, group_id))) {
if (OB_HASH_NOT_EXIST == ret) {
//创建group后立刻删除,可能还没有被刷到存储层或plan未生效,此时不再进行后续操作
ret = OB_SUCCESS;
LOG_INFO("delete group success with no_releated_io_module", K(consumer_group), K(tenant_id));
} else {
LOG_WARN("fail get group id", K(ret), K(group_id), K(consumer_group));
}
} else if (OB_FAIL(GCTX.cgroup_ctrl_->delete_group_iops(tenant_id, consumer_group))) {
LOG_WARN("fail to stop cur iops isolation", K(ret), K(tenant_id), K(consumer_group));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->remove_both_cgroup(
tenant_id, group_id, GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN("fail to remove group cgroup", K(ret), K(tenant_id), K(consumer_group), K(group_id));
}
}
return ret;
@ -1146,7 +1163,6 @@ int ObResourceManagerProxy::delete_plan_directive(
// 在这里inner sql之后就stop的原因是, 无法从内部表读到被删除group的信息
if (OB_FAIL(GCTX.cgroup_ctrl_->reset_group_iops(
tenant_id,
1,
group))) {
LOG_WARN("reset deleted group directive failed", K(ret), K(group));
}

View File

@ -65,7 +65,6 @@ int ObPlanDirective::assign(const ObPlanDirective &other)
max_iops_ = other.max_iops_;
weight_iops_ = other.weight_iops_;
group_id_ = other.group_id_;
level_ = other.level_;
ret = group_name_.assign(other.group_name_);
return ret;
}

View File

@ -162,14 +162,13 @@ class ObPlanDirective
public:
ObPlanDirective() :
tenant_id_(common::OB_INVALID_ID),
mgmt_p1_(100),
utilization_limit_(100),
mgmt_p1_(1),
utilization_limit_(1),
min_iops_(0),
max_iops_(100),
weight_iops_(0),
group_id_(),
group_name_(),
level_(1)
group_name_()
{}
~ObPlanDirective() = default;
public:
@ -226,18 +225,16 @@ public:
K_(min_iops),
K_(max_iops),
K_(weight_iops),
K_(group_id),
K_(level));
K_(group_id));
public:
uint64_t tenant_id_;
int64_t mgmt_p1_;
int64_t utilization_limit_;
double mgmt_p1_;
double utilization_limit_;
uint64_t min_iops_;
uint64_t max_iops_;
uint64_t weight_iops_;
uint64_t group_id_;
share::ObGroupName group_name_;
int level_;
private:
DISALLOW_COPY_AND_ASSIGN(ObPlanDirective);
};

View File

@ -17,6 +17,7 @@
#include "share/resource_manager/ob_resource_manager_proxy.h"
#include "share/resource_manager/ob_cgroup_ctrl.h"
#include "observer/ob_server_struct.h"
#include "observer/omt/ob_multi_tenant.h"
using namespace oceanbase::common;
@ -58,9 +59,7 @@ int ObResourcePlanManager::switch_resource_plan(const uint64_t tenant_id, ObStri
} else if (origin_plan != cur_plan) {
// switch plan,reset 原来plan下对应directive的io资源
ObResourceManagerProxy proxy;
if (OB_FAIL(GCTX.cgroup_ctrl_->reset_all_group_iops(
tenant_id,
1))) {
if (OB_FAIL(GCTX.cgroup_ctrl_->reset_all_group_iops(tenant_id))) {
LOG_ERROR("reset old plan group directive failed", K(tenant_id), K(ret));
}
if (OB_SUCC(ret) && plan_name.empty()) {
@ -69,8 +68,26 @@ int ObResourcePlanManager::switch_resource_plan(const uint64_t tenant_id, ObStri
LOG_WARN("fail reset all group rules",K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(tenant_plan_map_.set_refactored(tenant_id, cur_plan, 1))) { //overrite
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id));
} else {
int tmp_ret = OB_SUCCESS;
for (int64_t i = 0; i < tenant_holder.get_ptr()->get_group_num(); i++) {
uint64_t group_id = tenant_holder.get_ptr()->get_io_config().group_ids_.at(i);
if (GCTX.cgroup_ctrl_->is_valid() &&
OB_TMP_FAIL(GCTX.cgroup_ctrl_->remove_both_cgroup(
tenant_id, group_id, GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN("remove tenant cgroup failed", K(tmp_ret), K(tenant_id));
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(tenant_plan_map_.set_refactored(tenant_id, cur_plan, 1))) { // overrite
LOG_WARN("set plan failed", K(ret), K(tenant_id));
} else {
LOG_INFO("switch resource plan success", K(tenant_id), K(origin_plan), K(cur_plan));
@ -80,6 +97,76 @@ int ObResourcePlanManager::switch_resource_plan(const uint64_t tenant_id, ObStri
return ret;
}
int ObResourcePlanManager::refresh_global_background_cpu()
{
int ret = OB_SUCCESS;
int32_t cfs_period_us = 0;
if (GCONF.enable_global_background_resource_isolation) {
double cpu = static_cast<double>(GCONF.global_background_cpu_quota);
if (cpu <= 0) {
cpu = -1;
}
if (cpu >= 0 && OB_FAIL(GCTX.cgroup_ctrl_->set_cpu_shares( // set cgroup/background/cpu.shares
OB_INVALID_TENANT_ID,
cpu,
OB_INVALID_GROUP_ID,
BACKGROUND_CGROUP))) {
LOG_WARN("fail to set background cpu shares", K(ret));
}
int compare_ret = 0;
if (OB_SUCC(ret) && OB_SUCC(GCTX.cgroup_ctrl_->compare_cpu(background_quota_, cpu, compare_ret))) {
if (0 == compare_ret) {
// do nothing
} else if (OB_FAIL(GCTX.cgroup_ctrl_->set_cpu_cfs_quota( // set cgroup/background/cpu.cfs_quota_us
OB_INVALID_TENANT_ID,
cpu,
OB_INVALID_GROUP_ID,
BACKGROUND_CGROUP))) {
LOG_WARN("fail to set background cpu cfs quota", K(ret));
} else {
if (compare_ret < 0) {
int tmp_ret = OB_SUCCESS;
omt::TenantIdList ids;
GCTX.omt_->get_tenant_ids(ids);
for (uint64_t i = 0; i < ids.size(); i++) {
uint64_t tenant_id = ids[i];
double target_cpu = -1;
if (!is_virtual_tenant_id(tenant_id)) {
MTL_SWITCH(tenant_id)
{
target_cpu = MTL_CTX()->unit_max_cpu();
}
}
if (OB_TMP_FAIL(GCTX.cgroup_ctrl_->compare_cpu(target_cpu, cpu, compare_ret))) {
LOG_WARN_RET(tmp_ret, "compare tenant cpu failed", K(tmp_ret), K(tenant_id));
} else if (compare_ret > 0) {
target_cpu = cpu;
}
if (OB_TMP_FAIL(GCTX.cgroup_ctrl_->set_cpu_cfs_quota(tenant_id,
target_cpu,
OB_INVALID_GROUP_ID,
BACKGROUND_CGROUP))) {
LOG_WARN_RET(tmp_ret, "set tenant cpu cfs quota failed", K(tmp_ret), K(tenant_id));
} else if (OB_TMP_FAIL(GCTX.cgroup_ctrl_->set_cpu_cfs_quota(
tenant_id, target_cpu, USER_RESOURCE_OTHER_GROUP_ID, BACKGROUND_CGROUP))) {
LOG_WARN_RET(tmp_ret, "set tenant cpu cfs quota failed", K(ret), K(tenant_id));
} else if (is_user_tenant(tenant_id)) {
uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_TMP_FAIL(GCTX.cgroup_ctrl_->set_cpu_cfs_quota(
meta_tenant_id, target_cpu, OB_INVALID_GROUP_ID, BACKGROUND_CGROUP))) {
LOG_WARN_RET(tmp_ret, "set tenant cpu cfs quota failed", K(tmp_ret), K(meta_tenant_id));
}
}
}
}
background_quota_ = cpu;
}
}
}
return ret;
}
int ObResourcePlanManager::refresh_resource_plan(const uint64_t tenant_id, ObString &plan_name)
{
int ret = OB_SUCCESS;
@ -110,16 +197,18 @@ int ObResourcePlanManager::refresh_resource_plan(const uint64_t tenant_id, ObStr
// step1: 以 100 为总值做归一化
// step2: 将值转化成 cgroup 值 (utilization=>cfs_cpu_quota 的值和 cpu 核数等有关)
// - 如果 utilization = 100,那么 cfs_cpu_quota = -1
} else if (OB_FAIL(create_cgroup_dir_if_not_exist(directives))) {
LOG_WARN("fail create cgroup dir", K(directives), K(ret));
} else if (OB_FAIL(refresh_global_background_cpu())) {
LOG_WARN("fail refresh background cpu quota", K(ret));
} else if (OB_FAIL(normalize_cpu_directives(directives))) {
LOG_WARN("fail normalize directive", K(ret));
} else if (OB_FAIL(flush_directive_to_cgroup_fs(directives))) { // for CPU
} else if (OB_FAIL(flush_directive_to_cgroup_fs(directives))) { // for CPU
LOG_WARN("fail flush directive to cgroup fs", K(ret));
}
}
if (OB_SUCC(ret)) {
LOG_INFO("refresh resource plan success", K(tenant_id), K(plan_name), K(directives));
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) { // 10s
LOG_INFO("refresh resource plan success", K(tenant_id), K(plan_name), K(directives));
}
}
return ret;
}
@ -146,12 +235,10 @@ int ObResourcePlanManager::get_cur_plan(const uint64_t tenant_id, ObResMgrVarcha
int ObResourcePlanManager::normalize_cpu_directives(ObPlanDirectiveSet &directives)
{
int ret = OB_SUCCESS;
int64_t total_mgmt = 0;
int64_t total_util = 0;
double total_mgmt = 0;
for (int64_t i = 0; i < directives.count(); ++i) {
const ObPlanDirective &d = directives.at(i);
total_mgmt += d.mgmt_p1_;
total_util += d.utilization_limit_;
}
// 计算:cfs_cpu_quota 值
@ -176,29 +263,24 @@ int ObResourcePlanManager::normalize_cpu_directives(ObPlanDirectiveSet &directiv
if (0 == total_mgmt) {
d.mgmt_p1_ = 0;
} else {
d.mgmt_p1_ = 100 * d.mgmt_p1_ / total_mgmt;
d.mgmt_p1_ = d.mgmt_p1_ / total_mgmt;
}
if (0 == total_util) {
d.utilization_limit_ = 0;
double tenant_shares_cpu = 0;
int32_t cfs_period_us = 0;
if (OB_FAIL(GCTX.cgroup_ctrl_->get_cpu_shares(
d.tenant_id_,
tenant_shares_cpu,
OB_INVALID_GROUP_ID,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP
: ""))) {
LOG_WARN("fail get cpu shares", K(d), K(ret));
} else {
int32_t tenant_cpu_shares = 0;
int32_t cfs_period_us = 0;
if (OB_FAIL(GCTX.cgroup_ctrl_->get_cpu_shares(tenant_cpu_shares, d.tenant_id_))) {
LOG_WARN("fail get cpu shares", K(d), K(ret));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->get_cpu_cfs_period(
d.tenant_id_,
d.level_,
d.group_name_,
cfs_period_us))) {
LOG_WARN("fail get cpu cfs period", K(d), K(ret));
if (d.utilization_limit_ == 100) {
// 不限制
d.utilization_limit_ = -1;
} else {
if (d.utilization_limit_ == 100) {
// 不限制
d.utilization_limit_ = -1;
} else {
d.utilization_limit_ =
(int64_t)cfs_period_us * tenant_cpu_shares * d.utilization_limit_ / 100 / 1024;
}
d.utilization_limit_ =
tenant_shares_cpu * d.utilization_limit_ / 100;
}
}
}
@ -253,27 +335,6 @@ int ObResourcePlanManager::normalize_iops_directives(const uint64_t tenant_id,
return ret;
}
int ObResourcePlanManager::create_cgroup_dir_if_not_exist(const ObPlanDirectiveSet &directives)
{
int ret = OB_SUCCESS;
share::ObCgroupCtrl *cgroup_ctrl = GCTX.cgroup_ctrl_;
if (OB_ISNULL(cgroup_ctrl) || !cgroup_ctrl->is_valid()) {
ret = OB_NOT_INIT;
} else {
for (int64_t i = 0; i < directives.count(); ++i) {
const ObPlanDirective &d = directives.at(i);
if (OB_FAIL(cgroup_ctrl->create_user_tenant_group_dir(
d.tenant_id_,
d.level_,
d.group_name_))) {
LOG_WARN("fail init user tenant group", K(d), K(ret));
}
}
}
return ret;
}
/*
* cpu share
* cfs_quota_us cpu
@ -313,20 +374,22 @@ int ObResourcePlanManager::flush_directive_to_cgroup_fs(ObPlanDirectiveSet &dire
int ret = OB_SUCCESS;
for (int64_t i = 0; i < directives.count(); ++i) {
const ObPlanDirective &d = directives.at(i);
if (OB_FAIL(GCTX.cgroup_ctrl_->set_cpu_shares(
d.tenant_id_,
d.level_,
d.group_name_,
static_cast<int32_t>(d.mgmt_p1_)))) {
LOG_ERROR("fail set cpu shares. tenant isolation function may not functional!!",
K(d), K(ret));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->set_cpu_cfs_quota(
d.tenant_id_,
d.level_,
d.group_name_,
static_cast<int32_t>(d.utilization_limit_)))) {
LOG_ERROR("fail set cpu quota. tenant isolation function may not functional!!",
K(d), K(ret));
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(d.tenant_id_, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(d.tenant_id_));
} else if (!tenant_holder.get_ptr()->get_io_config().group_configs_.at(i).deleted_ &&
!tenant_holder.get_ptr()->get_io_config().group_configs_.at(i).cleared_) {
if (OB_FAIL(GCTX.cgroup_ctrl_->set_both_cpu_shares(d.tenant_id_,
d.mgmt_p1_,
d.group_id_,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_ERROR("fail set cpu shares. tenant isolation function may not functional!!", K(d), K(ret));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->set_both_cpu_cfs_quota(d.tenant_id_,
d.utilization_limit_,
d.group_id_,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_ERROR("fail set cpu quota. tenant isolation function may not functional!!", K(d), K(ret));
}
}
// ignore ret, continue
}
@ -349,7 +412,6 @@ int ObResourcePlanManager::flush_directive_to_iops_control(const uint64_t tenant
LOG_ERROR("fail init group io info", K(cur_directive), K(ret));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->set_group_iops(
cur_directive.tenant_id_,
cur_directive.level_,
cur_directive.group_id_,
cur_io_info))) {
LOG_ERROR("fail set iops. tenant isolation function may not functional!!",
@ -365,7 +427,6 @@ int ObResourcePlanManager::flush_directive_to_iops_control(const uint64_t tenant
LOG_ERROR("fail init other group io info", K(other_group_directive), K(ret));
} else if (OB_FAIL(GCTX.cgroup_ctrl_->set_group_iops(
other_group_directive.tenant_id_,
other_group_directive.level_,
other_group_directive.group_id_,
other_io_info))) {
LOG_ERROR("fail set iops. tenant isolation function may not functional!!",

View File

@ -28,16 +28,17 @@ class ObString;
namespace share
{
static constexpr int64_t OTHER_GROUPS_IOPS_WEIGHT = 100L;
class ObResourcePlanManager
{
public:
typedef common::ObSEArray<ObPlanDirective, 8> ObPlanDirectiveSet;
public:
ObResourcePlanManager() = default;
ObResourcePlanManager() : tenant_plan_map_(), background_quota_(INT32_MAX)
{}
virtual ~ObResourcePlanManager() = default;
int init();
int refresh_resource_plan(const uint64_t tenant_id, common::ObString &plan_name);
int refresh_global_background_cpu();
int get_cur_plan(const uint64_t tenant_id, ObResMgrVarcharValue &plan_name);
private:
/* functions */
@ -46,13 +47,13 @@ private:
int flush_directive_to_iops_control(const uint64_t tenant_id,
ObPlanDirectiveSet &directives,
ObPlanDirective &other_group_directive);
int create_cgroup_dir_if_not_exist(const ObPlanDirectiveSet &directives);
int normalize_cpu_directives(ObPlanDirectiveSet &directives);
int normalize_iops_directives(const uint64_t tenant_id,
ObPlanDirectiveSet &directives,
ObPlanDirective &other_group_directive);
int refresh_tenant_group_io_config(const uint64_t tenant_id);
common::hash::ObHashMap<uint64_t, ObResMgrVarcharValue> tenant_plan_map_;
int32_t background_quota_;
/* variables */
DISALLOW_COPY_AND_ASSIGN(ObResourcePlanManager);
};

View File

@ -468,7 +468,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type)
dag_ret_(OB_SUCCESS),
add_time_(0),
start_time_(0),
consumer_group_id_(0),
consumer_group_id_(USER_RESOURCE_OTHER_GROUP_ID),
error_location_(),
allocator_(nullptr),
is_inited_(false),
@ -528,7 +528,7 @@ void ObIDag::clear_running_info()
{
add_time_ = 0;
start_time_ = 0;
consumer_group_id_ = 0;
consumer_group_id_ = USER_RESOURCE_OTHER_GROUP_ID;
running_task_cnt_ = 0;
dag_status_ = ObDagStatus::DAG_STATUS_INITING;
dag_ret_ = OB_SUCCESS;
@ -1576,7 +1576,7 @@ ObTenantDagWorker::ObTenantDagWorker()
check_period_(0),
last_check_time_(0),
function_type_(0),
group_id_(0),
group_id_(OB_INVALID_GROUP_ID),
tg_id_(-1),
hold_by_compaction_dag_(false),
is_inited_(false)
@ -1647,7 +1647,7 @@ void ObTenantDagWorker::reset()
check_period_ = 0;
last_check_time_ = 0;
function_type_ = 0;
group_id_ = 0;
group_id_ = OB_INVALID_GROUP_ID;
self_ = NULL;
is_inited_ = false;
TG_DESTROY(tg_id_);
@ -1668,17 +1668,23 @@ void ObTenantDagWorker::resume()
int ObTenantDagWorker::set_dag_resource(const uint64_t group_id)
{
int ret = OB_SUCCESS;
uint64_t consumer_group_id = 0;
if (group_id != 0) {
uint64_t consumer_group_id = USER_RESOURCE_OTHER_GROUP_ID;
if (is_user_group(group_id)) {
//user level
consumer_group_id = group_id;
} else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, consumer_group_id))) {
//function level
LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(consumer_group_id));
}
if (OB_SUCC(ret) && consumer_group_id != group_id_) {
// for CPU isolation, depend on cgroup
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), consumer_group_id))) {
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() &&
OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_cgroup(
MTL_ID(),
consumer_group_id,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP
: ""))) {
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id));
} else {
// for IOPS isolation, only depend on consumer_group_id

View File

@ -1570,7 +1570,10 @@ int ObBlockManager::set_group_id(const uint64_t tenant_id)
LOG_WARN("fail to get group id by function", K(ret), K(tenant_id), K(consumer_group_id));
} else if (consumer_group_id != group_id_) {
// for CPU isolation, depend on cgroup
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(tenant_id, consumer_group_id))) {
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() &&
OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id,
consumer_group_id,
GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) {
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(tenant_id), K(consumer_group_id));
}
}

View File

@ -84,6 +84,7 @@ enable_crazy_medium_compaction
enable_dblink
enable_ddl
enable_early_lock_release
enable_global_background_resource_isolation
enable_kv_ttl
enable_major_freeze
enable_monotonic_weak_read
@ -109,6 +110,7 @@ enable_user_defined_rewrite_rules
external_kms_info
freeze_trigger_percentage
fuse_row_cache_priority
global_background_cpu_quota
ha_high_thread_score
ha_low_thread_score
ha_mid_thread_score

View File

@ -46,7 +46,7 @@ TEST(TestCgroupCtrl, AddDelete)
for (int i = 0; i < 4; i++) {
pthread_join(ts[i], nullptr);
}
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_tenant_cgroup(1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_cgroup(1001));
}
TEST(TestCgroupCtrl, SetGetValue)
@ -63,14 +63,14 @@ TEST(TestCgroupCtrl, SetGetValue)
const int32_t cpu_shares = 2048;
int32_t cpu_shares_v = 0;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_shares(cpu_shares, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_shares(cpu_shares_v, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_both_cpu_shares(1001, cpu_shares));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_shares(1001, cpu_shares_v));
ASSERT_EQ(cpu_shares, cpu_shares_v);
const int32_t cpu_cfs_quota = 80000;
int32_t cpu_cfs_quota_v = 0;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_cfs_quota(cpu_cfs_quota, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_cfs_quota(cpu_cfs_quota_v, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_both_cpu_cfs_quota(1001, cpu_cfs_quota));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_cfs_quota(1001, cpu_cfs_quota_v));
ASSERT_EQ(cpu_cfs_quota, cpu_cfs_quota_v);
for (int i = 0; i < 4; i++) {