[refactor](executor)Refactor workload meta update to be #26710

This commit is contained in:
wangbo
2023-11-18 11:19:38 +08:00
committed by GitHub
parent b1eef30b49
commit be7273da83
18 changed files with 277 additions and 155 deletions

View File

@ -25,14 +25,14 @@ Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
return Status::InternalError<false>("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
}
if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(ERROR) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
return Status::InternalError<false>("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
}
if (_doris_cgroup_cpu_path.back() != '/') {
@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}
void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit) {
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
*cpu_shares = this->_cpu_shares;
*cpu_hard_limit = this->_cpu_hard_limit;
}
void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
if (!_init_succ) {
return;
@ -72,14 +78,14 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
if (fd == -1) {
LOG(ERROR) << "open path failed, path=" << file_path;
return Status::InternalError("open path failed, path={}", file_path);
return Status::InternalError<false>("open path failed, path={}", file_path);
}
auto str = fmt::format("{}\n", value);
int ret = write(fd, str.c_str(), str.size());
if (ret == -1) {
LOG(ERROR) << msg << " write sys file failed";
return Status::InternalError("{} write sys file failed", msg);
return Status::InternalError<false>("{} write sys file failed", msg);
}
LOG(INFO) << msg << " success";
return Status::OK();
@ -94,8 +100,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError("cgroup v1 mkdir query failed, path=",
_cgroup_v1_cpu_query_path);
return Status::InternalError<false>("cgroup v1 mkdir query failed, path=",
_cgroup_v1_cpu_query_path);
}
}
@ -105,8 +111,8 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
}
}

View File

@ -47,6 +47,9 @@ public:
void update_cpu_soft_limit(int cpu_shares);
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);

View File

@ -25,6 +25,6 @@ class TopicListener {
public:
virtual ~TopicListener() {}
virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0;
virtual void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) = 0;
};
} // namespace doris

View File

@ -42,8 +42,11 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
std::shared_lock lock(_listener_mtx);
LOG(INFO) << "begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
listener_pair.second->handle_topic_info(topic_request);
LOG(INFO) << "handle topic " << listener_pair.first << " succ";
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
LOG(INFO) << "handle topic " << listener_pair.first << " succ";
}
}
}
} // namespace doris

View File

@ -24,18 +24,44 @@
namespace doris {
void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) {
void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::set<uint64_t> current_wg_ids;
for (const TopicInfo& topic_info : topic_request.topic_list) {
if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) {
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
}
int wg_id = 0;
auto iter2 = topic_info.info_map.find("id");
std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id);
// 1 parse topicinfo to group info
taskgroup::TaskGroupInfo task_group_info;
Status ret = taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info,
&task_group_info);
if (!ret.ok()) {
LOG(INFO) << "parse topic info failed, tg_id=" << task_group_info.id
<< ", reason:" << ret.to_string();
continue;
}
current_wg_ids.insert(task_group_info.id);
current_wg_ids.insert(wg_id);
// 2 update task group
auto tg = _exec_env->task_group_manager()->get_or_create_task_group(task_group_info);
// 3 set cpu soft hard limit switch
_exec_env->task_group_manager()->_enable_cpu_hard_limit.store(
task_group_info.enable_cpu_hard_limit);
// 4 create and update task scheduler
Status ret2 =
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, _exec_env);
if (!ret2.ok()) {
LOG(WARNING) << "upsert task sche failed, tg_id=" << task_group_info.id
<< ", reason=" << ret2.to_string();
}
LOG(INFO) << "update task group success, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
<< _exec_env->task_group_manager()->enable_cpu_hard_limit()
<< ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit;
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);

View File

@ -29,7 +29,7 @@ public:
~WorkloadGroupListener() {}
WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
void handle_topic_info(const TPublishTopicRequest& topic_request) override;
void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override;
private:
ExecEnv* _exec_env;

View File

@ -1090,8 +1090,8 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

View File

@ -1163,8 +1163,8 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_String(doris_cgroup_cpu_path);
DECLARE_Bool(enable_cgroup_cpu_soft_limit);
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_cgroup_cpu_soft_limit);
// This config controls whether the s3 file writer would flush cache asynchronously
DECLARE_Bool(enable_flush_file_cache_async);

View File

@ -654,45 +654,28 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
if (params.__isset.workload_groups && !params.workload_groups.empty()) {
taskgroup::TaskGroupInfo task_group_info;
auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
&task_group_info);
if (status.ok()) {
auto tg = _exec_env->task_group_manager()->get_or_create_task_group(
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
uint64_t tg_id = tg->id();
std::string tg_name = tg->name();
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " use task group: " << tg->debug_string()
<< " cpu_hard_limit: " << task_group_info.cpu_hard_limit
<< " cpu_share:" << task_group_info.cpu_share
<< " enable cgroup soft cpu:" << config::enable_cgroup_cpu_soft_limit;
if (task_group_info.cpu_hard_limit > 0) {
Status ret = _exec_env->task_group_manager()->create_and_get_task_scheduler(
tg_id, tg_name, task_group_info.cpu_hard_limit,
task_group_info.cpu_share, _exec_env, query_ctx.get());
if (!ret.ok()) {
LOG(INFO) << "workload group init failed "
<< ", name=" << tg_name << ", id=" << tg_id
<< ", reason=" << ret.to_string();
}
uint64_t tg_id = params.workload_groups[0].id;
auto* tg_mgr = _exec_env->task_group_manager();
if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) {
std::stringstream ss;
ss << "Query/load id: " << print_id(query_ctx->query_id());
ss << " use task group " << task_group_ptr->debug_string();
if (tg_mgr->enable_cpu_soft_limit() && !config::enable_cgroup_cpu_soft_limit) {
query_ctx->set_task_group(task_group_ptr);
ss << ", cpu soft limit based doris sche";
} else {
if (!config::enable_cgroup_cpu_soft_limit) {
query_ctx->set_task_group(tg);
bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id, query_ctx.get());
if (tg_mgr->enable_cpu_hard_limit()) {
ss << ", cpu hard limit based cgroup";
} else {
Status ret =
_exec_env->task_group_manager()->create_and_get_task_scheduler(
tg_id, tg_name, task_group_info.cpu_hard_limit,
task_group_info.cpu_share, _exec_env, query_ctx.get());
if (!ret.ok()) {
LOG(INFO) << "workload group cpu soft limit init failed "
<< ", name=" << tg_name << ", id=" << tg_id
<< ", reason=" << ret.to_string();
}
ss << ", cpu soft limit based cgroup";
}
if (!ret) {
ss << ", but cgroup init failed, scan or exec fallback to no group";
}
}
}
LOG(INFO) << ss.str();
} // else, query run with no group
} else {
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id())
<< " does not use task group.";

View File

@ -113,15 +113,16 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_cpu_share(tg_info.cpu_share),
_task_entity(this, "pipeline task entity"),
_local_scan_entity(this, "local scan entity"),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
_cpu_hard_limit(tg_info.cpu_hard_limit) {}
std::string TaskGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
return fmt::format(
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}]",
"{}, version = {}, cpu_hard_limit = {}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version);
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit());
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
@ -142,6 +143,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
_memory_limit = tg_info.memory_limit;
_enable_memory_overcommit = tg_info.enable_memory_overcommit;
_cpu_share = tg_info.cpu_share;
_cpu_hard_limit = tg_info.cpu_hard_limit;
} else {
return;
}
@ -185,49 +187,80 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const {
tg_info->version = _version;
}
Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_group,
TaskGroupInfo* task_group_info) {
if (UNLIKELY(!check_group_info(resource_group))) {
std::stringstream ss;
ss << "incomplete resource group parameters: ";
resource_group.printTo(ss);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info,
taskgroup::TaskGroupInfo* task_group_info) {
// 1 id
int tg_id = 0;
if (workload_group_info.__isset.id) {
tg_id = workload_group_info.id;
} else {
return Status::InternalError<false>("workload group id is required");
}
task_group_info->id = tg_id;
auto iter = resource_group.properties.find(CPU_SHARE);
uint64_t share = 0;
std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share);
// 2 name
std::string name = "INVALID_NAME";
if (workload_group_info.__isset.name) {
name = workload_group_info.name;
}
task_group_info->name = name;
int cpu_hard_limit = 0;
auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT);
std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(),
cpu_hard_limit);
// 3 version
int version = 0;
if (workload_group_info.__isset.version) {
version = workload_group_info.version;
} else {
return Status::InternalError<false>("workload group version is required");
}
task_group_info->version = version;
task_group_info->id = resource_group.id;
task_group_info->name = resource_group.name;
task_group_info->version = resource_group.version;
task_group_info->cpu_share = share;
// 4 cpu_share
uint64_t cpu_share = 1024;
if (workload_group_info.__isset.cpu_share) {
cpu_share = workload_group_info.cpu_share;
}
task_group_info->cpu_share = cpu_share;
// 5 cpu hard limit
int cpu_hard_limit = -1;
if (workload_group_info.__isset.cpu_hard_limit) {
cpu_hard_limit = workload_group_info.cpu_hard_limit;
}
task_group_info->cpu_hard_limit = cpu_hard_limit;
// 6 mem_limit
bool is_percent = true;
auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
auto mem_limit =
std::string mem_limit_str;
if (workload_group_info.__isset.mem_limit) {
mem_limit_str = workload_group_info.mem_limit;
} else {
return Status::InternalError<false>("workload group mem_limit is required");
}
int64_t mem_limit =
ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent);
if (UNLIKELY(mem_limit <= 0)) {
std::stringstream ss;
ss << "parse memory limit from TPipelineWorkloadGroup error, " << MEMORY_LIMIT << ": "
<< mem_limit_str;
ss << "parse memory limit error, " << MEMORY_LIMIT << ": " << mem_limit_str;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InternalError<false>("invalid value for {}, val={}", MEMORY_LIMIT,
mem_limit);
}
task_group_info->memory_limit = mem_limit;
auto enable_memory_overcommit_iter = resource_group.properties.find(ENABLE_MEMORY_OVERCOMMIT);
task_group_info->enable_memory_overcommit =
enable_memory_overcommit_iter != resource_group.properties.end() &&
enable_memory_overcommit_iter->second ==
"true" /* fe guarantees it is 'true' or 'false' */;
// 7 mem overcommit
bool enable_memory_overcommit = true;
if (workload_group_info.__isset.enable_memory_overcommit) {
enable_memory_overcommit = workload_group_info.enable_memory_overcommit;
}
task_group_info->enable_memory_overcommit = enable_memory_overcommit;
// 8 cpu soft limit or hard limit
bool enable_cpu_hard_limit = false;
if (workload_group_info.__isset.enable_cpu_hard_limit) {
enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit;
}
task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
return Status::OK();
}

View File

@ -17,6 +17,7 @@
#pragma once
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
@ -106,6 +107,8 @@ public:
uint64_t cpu_share() const { return _cpu_share.load(); }
int cpu_hard_limit() const { return _cpu_hard_limit.load(); }
uint64_t id() const { return _id; }
std::string name() const { return _name; };
@ -147,6 +150,7 @@ private:
TaskGroupPipelineTaskEntity _task_entity;
TaskGroupScanTaskEntity _local_scan_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
@ -159,9 +163,13 @@ struct TaskGroupInfo {
bool enable_memory_overcommit;
int64_t version;
int cpu_hard_limit;
bool enable_cpu_hard_limit;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
uint64_t cgroup_cpu_hard_limit = 0;
static Status parse_group_info(const TPipelineWorkloadGroup& resource_group,
TaskGroupInfo* task_group_info);
static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
taskgroup::TaskGroupInfo* task_group_info);
private:
static bool check_group_info(const TPipelineWorkloadGroup& resource_group);

View File

@ -59,10 +59,38 @@ void TaskGroupManager::get_resource_groups(const std::function<bool(const TaskGr
}
}
Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::string tg_name,
int cpu_hard_limit, int cpu_shares,
ExecEnv* exec_env,
QueryContext* query_ctx_ptr) {
TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
if (_task_groups.find(tg_id) != _task_groups.end()) {
return _task_groups.at(tg_id);
}
return nullptr;
}
bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) {
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
} else {
return false;
}
if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get());
} else {
return false;
}
return true;
}
Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
uint64_t tg_id = tg_info->id;
std::string tg_name = tg_info->name;
int cpu_hard_limit = tg_info->cpu_hard_limit;
uint64_t cpu_shares = tg_info->cpu_share;
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
@ -73,7 +101,7 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
} else {
return Status::Error<INTERNAL_ERROR, false>("cgroup init failed, gid={}", tg_id);
return Status::InternalError<false>("cgroup init failed, gid={}", tg_id);
}
}
@ -92,8 +120,7 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
return Status::Error<INTERNAL_ERROR, false>("task scheduler start failed, gid={}",
tg_id);
return Status::InternalError<false>("task scheduler start failed, gid={}", tg_id);
}
}
@ -105,27 +132,27 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
} else {
return Status::Error<INTERNAL_ERROR, false>("scan scheduler start failed, gid={}",
tg_id);
return Status::InternalError<false>("scan scheduler start failed, gid={}", tg_id);
}
}
// step 4 set exec/scan task scheudler to query ctx
pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get();
query_ctx_ptr->set_task_scheduler(task_sche);
vectorized::SimplifiedScanScheduler* scan_task_sche = _tg_scan_sche_map.at(tg_id).get();
query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
// step 5 update cgroup cpu if needed
if (cpu_hard_limit > 0) {
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
// step 4 update cgroup cpu if needed
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
} else {
return Status::InternalError<false>("enable cpu hard limit but value is illegal");
}
} else {
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
if (config::enable_cgroup_cpu_soft_limit) {
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
}
}
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
&(tg_info->cgroup_cpu_hard_limit));
return Status::OK();
}

View File

@ -51,14 +51,22 @@ public:
void get_resource_groups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit,
int cpu_shares, ExecEnv* exec_env,
QueryContext* query_ctx_ptr);
Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
TaskGroupPtr get_task_group_by_id(uint64_t tg_id);
void stop();
std::atomic<bool> _enable_cpu_hard_limit = false;
bool enable_cpu_soft_limit() { return !_enable_cpu_hard_limit.load(); }
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr);
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;

View File

@ -19,7 +19,7 @@ package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TopicInfo;
import org.apache.doris.thrift.TTopicInfoType;
public class WorkloadGroupPublisher implements TopicPublisher {
@ -31,9 +31,7 @@ public class WorkloadGroupPublisher implements TopicPublisher {
@Override
public void getTopicInfo(TPublishTopicRequest req) {
for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
.getPublishTopicInfo()) {
req.addToTopicList(topicInfo);
}
req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
env.getWorkloadGroupMgr().getPublishTopicInfo());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@ -25,7 +26,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
@ -311,20 +312,44 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
public TPipelineWorkloadGroup toThrift() {
//note(wb) we need add a new key-value to properties and then transfer it to be, so need a copy here
// see WorkloadGroupMgr.getWorkloadGroup
HashMap<String, String> clonedHashMap = new HashMap<>();
clonedHashMap.putAll(properties);
return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
return new TPipelineWorkloadGroup().setId(id);
}
public TopicInfo toTopicInfo() {
HashMap<String, String> newHashMap = new HashMap<>();
newHashMap.put("id", String.valueOf(id));
TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
tWorkloadGroupInfo.setId(id);
tWorkloadGroupInfo.setName(name);
tWorkloadGroupInfo.setVersion(version);
String cpuShareStr = properties.get(CPU_SHARE);
if (cpuShareStr != null) {
tWorkloadGroupInfo.setCpuShare(Long.valueOf(cpuShareStr));
}
String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
if (cpuHardLimitStr != null) {
tWorkloadGroupInfo.setCpuHardLimit(Integer.valueOf(cpuHardLimitStr));
}
String memLimitStr = properties.get(MEMORY_LIMIT);
if (memLimitStr != null) {
tWorkloadGroupInfo.setMemLimit(memLimitStr);
}
String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
if (memOvercommitStr != null) {
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
}
// enable_cpu_hard_limit = true, using cpu hard limit
// enable_cpu_hard_limit = false, using cpu soft limit
tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit);
if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) {
LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not illegal,"
+ "id=" + id + ",name=" + name);
}
TopicInfo topicInfo = new TopicInfo();
topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
topicInfo.setInfoMap(newHashMap);
topicInfo.setTopicKey(name);
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
}

View File

@ -125,13 +125,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
throw new UserException("Workload group " + groupName + " does not exist");
}
workloadGroups.add(workloadGroup.toThrift());
// note(wb) -1 to tell be no need to not use cpu hard limit
int cpuHardLimitThriftVal = -1;
if (Config.enable_cpu_hard_limit && workloadGroup.getCpuHardLimit() > 0) {
cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
}
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
String.valueOf(cpuHardLimitThriftVal));
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();

View File

@ -30,7 +30,7 @@ import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.collect.Maps;
import mockit.Delegate;
@ -169,11 +169,11 @@ public class WorkloadGroupMgrTest {
CreateWorkloadGroupStmt stmt1 = new CreateWorkloadGroupStmt(false, name1, properties1);
workloadGroupMgr.createWorkloadGroup(stmt1);
context.getSessionVariable().setWorkloadGroup(name1);
List<TPipelineWorkloadGroup> tWorkloadGroups1 = workloadGroupMgr.getWorkloadGroup(context);
List<TopicInfo> tWorkloadGroups1 = workloadGroupMgr.getPublishTopicInfo();
Assert.assertEquals(1, tWorkloadGroups1.size());
TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups1.get(0);
Assert.assertEquals(name1, tWorkloadGroup1.getName());
Assert.assertTrue(tWorkloadGroup1.getProperties().containsKey(WorkloadGroup.CPU_SHARE));
TopicInfo tWorkloadGroup1 = tWorkloadGroups1.get(0);
Assert.assertEquals(name1, tWorkloadGroup1.getWorkloadGroupInfo().getName());
Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 10);
try {
context.getSessionVariable().setWorkloadGroup("g2");
@ -242,9 +242,9 @@ public class WorkloadGroupMgrTest {
workloadGroupMgr.alterWorkloadGroup(stmt2);
context.getSessionVariable().setWorkloadGroup(name);
List<TPipelineWorkloadGroup> tWorkloadGroups = workloadGroupMgr.getWorkloadGroup(context);
List<TopicInfo> tWorkloadGroups = workloadGroupMgr.getPublishTopicInfo();
Assert.assertEquals(1, tWorkloadGroups.size());
TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups.get(0);
Assert.assertEquals(tWorkloadGroup1.getProperties().get(WorkloadGroup.CPU_SHARE), "5");
TopicInfo tWorkloadGroup1 = tWorkloadGroups.get(0);
Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5);
}
}

View File

@ -164,14 +164,23 @@ enum TTopicInfoType {
WORKLOAD_GROUP
}
struct TWorkloadGroupInfo {
1: optional i64 id
2: optional string name
3: optional i64 version
4: optional i64 cpu_share
5: optional i32 cpu_hard_limit
6: optional string mem_limit
7: optional bool enable_memory_overcommit
8: optional bool enable_cpu_hard_limit
}
struct TopicInfo {
1: optional string topic_key
2: required TTopicInfoType topic_type
3: optional map<string, string> info_map
1: optional TWorkloadGroupInfo workload_group_info
}
struct TPublishTopicRequest {
1: required list<TopicInfo> topic_list
1: required map<TTopicInfoType, list<TopicInfo>> topic_map
}
struct TPublishTopicResult {