Add an internal workload group when Doris started, currently it mainly used to manage compaction workload cpu usage. pick #42006
This commit is contained in:
@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
|
||||
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
|
||||
}
|
||||
|
||||
std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
|
||||
std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
|
||||
if (_is_enable_cgroup_v2_in_env) {
|
||||
return std::make_unique<CgroupV2CpuCtl>(wg_id);
|
||||
return std::make_shared<CgroupV2CpuCtl>(wg_id);
|
||||
} else if (_is_enable_cgroup_v1_in_env) {
|
||||
return std::make_unique<CgroupV1CpuCtl>(wg_id);
|
||||
return std::make_shared<CgroupV1CpuCtl>(wg_id);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ public:
|
||||
|
||||
static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
|
||||
|
||||
static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
|
||||
static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
|
||||
|
||||
static bool is_a_valid_cgroup_path(std::string cg_path);
|
||||
|
||||
|
||||
@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques
|
||||
// eg, update workload info may delay other listener, then we need add a thread here
|
||||
// to handle_topic_info asynchronous
|
||||
std::shared_lock lock(_listener_mtx);
|
||||
LOG(INFO) << "[topic_publish]begin handle topic info";
|
||||
for (auto& listener_pair : _registered_listeners) {
|
||||
if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) {
|
||||
LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first
|
||||
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
|
||||
listener_pair.second->handle_topic_info(
|
||||
topic_request.topic_map.at(listener_pair.first));
|
||||
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first;
|
||||
LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first
|
||||
<< ", size=" << topic_request.topic_map.at(listener_pair.first).size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +125,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
|
||||
return threads_num;
|
||||
}
|
||||
|
||||
Status StorageEngine::start_bg_threads() {
|
||||
Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
|
||||
RETURN_IF_ERROR(Thread::create(
|
||||
"StorageEngine", "unused_rowset_monitor_thread",
|
||||
[this]() { this->_unused_rowset_monitor_thread_callback(); },
|
||||
@ -155,29 +155,60 @@ Status StorageEngine::start_bg_threads() {
|
||||
auto single_replica_compaction_threads =
|
||||
get_single_replica_compaction_threads_num(data_dirs.size());
|
||||
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
|
||||
.set_min_threads(base_compaction_threads)
|
||||
.set_max_threads(base_compaction_threads)
|
||||
.build(&_base_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
|
||||
.set_min_threads(cumu_compaction_threads)
|
||||
.set_max_threads(cumu_compaction_threads)
|
||||
.build(&_cumu_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
|
||||
.set_min_threads(single_replica_compaction_threads)
|
||||
.set_max_threads(single_replica_compaction_threads)
|
||||
.build(&_single_replica_compaction_thread_pool));
|
||||
if (wg_sptr && wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
|
||||
.set_min_threads(base_compaction_threads)
|
||||
.set_max_threads(base_compaction_threads)
|
||||
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
|
||||
.build(&_base_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
|
||||
.set_min_threads(cumu_compaction_threads)
|
||||
.set_max_threads(cumu_compaction_threads)
|
||||
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
|
||||
.build(&_cumu_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
|
||||
.set_min_threads(single_replica_compaction_threads)
|
||||
.set_max_threads(single_replica_compaction_threads)
|
||||
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
|
||||
.build(&_single_replica_compaction_thread_pool));
|
||||
|
||||
if (config::enable_segcompaction) {
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
|
||||
.set_min_threads(config::segcompaction_num_threads)
|
||||
.set_max_threads(config::segcompaction_num_threads)
|
||||
.build(&_seg_compaction_thread_pool));
|
||||
if (config::enable_segcompaction) {
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
|
||||
.set_min_threads(config::segcompaction_num_threads)
|
||||
.set_max_threads(config::segcompaction_num_threads)
|
||||
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
|
||||
.build(&_seg_compaction_thread_pool));
|
||||
}
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
|
||||
.set_min_threads(config::cold_data_compaction_thread_num)
|
||||
.set_max_threads(config::cold_data_compaction_thread_num)
|
||||
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
|
||||
.build(&_cold_data_compaction_thread_pool));
|
||||
} else {
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
|
||||
.set_min_threads(base_compaction_threads)
|
||||
.set_max_threads(base_compaction_threads)
|
||||
.build(&_base_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
|
||||
.set_min_threads(cumu_compaction_threads)
|
||||
.set_max_threads(cumu_compaction_threads)
|
||||
.build(&_cumu_compaction_thread_pool));
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
|
||||
.set_min_threads(single_replica_compaction_threads)
|
||||
.set_max_threads(single_replica_compaction_threads)
|
||||
.build(&_single_replica_compaction_thread_pool));
|
||||
|
||||
if (config::enable_segcompaction) {
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
|
||||
.set_min_threads(config::segcompaction_num_threads)
|
||||
.set_max_threads(config::segcompaction_num_threads)
|
||||
.build(&_seg_compaction_thread_pool));
|
||||
}
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
|
||||
.set_min_threads(config::cold_data_compaction_thread_num)
|
||||
.set_max_threads(config::cold_data_compaction_thread_num)
|
||||
.build(&_cold_data_compaction_thread_pool));
|
||||
}
|
||||
RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
|
||||
.set_min_threads(config::cold_data_compaction_thread_num)
|
||||
.set_max_threads(config::cold_data_compaction_thread_num)
|
||||
.build(&_cold_data_compaction_thread_pool));
|
||||
|
||||
// compaction tasks producer thread
|
||||
RETURN_IF_ERROR(Thread::create(
|
||||
|
||||
@ -71,6 +71,7 @@ class TxnManager;
|
||||
class ReportWorker;
|
||||
class CreateTabletIdxCache;
|
||||
struct DirInfo;
|
||||
class WorkloadGroup;
|
||||
|
||||
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
|
||||
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
|
||||
@ -171,7 +172,7 @@ public:
|
||||
}
|
||||
|
||||
// start all background threads. This should be call after env is ready.
|
||||
Status start_bg_threads();
|
||||
Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr);
|
||||
|
||||
// clear trash and snapshot file
|
||||
// option: update disk usage after sweep
|
||||
|
||||
@ -76,7 +76,7 @@ class TaskScheduler {
|
||||
public:
|
||||
TaskScheduler(ExecEnv* exec_env, std::shared_ptr<BlockedTaskScheduler> b_scheduler,
|
||||
std::shared_ptr<TaskQueue> task_queue, std::string name,
|
||||
CgroupCpuCtl* cgroup_cpu_ctl)
|
||||
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
|
||||
: _task_queue(std::move(task_queue)),
|
||||
_blocked_task_scheduler(std::move(b_scheduler)),
|
||||
_shutdown(false),
|
||||
@ -102,7 +102,7 @@ private:
|
||||
std::shared_ptr<BlockedTaskScheduler> _blocked_task_scheduler;
|
||||
std::atomic<bool> _shutdown;
|
||||
std::string _name;
|
||||
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
|
||||
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
|
||||
|
||||
void _do_work(size_t index);
|
||||
};
|
||||
|
||||
@ -222,6 +222,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
|
||||
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
|
||||
RETURN_IF_ERROR(init_pipeline_task_scheduler());
|
||||
_workload_group_manager = new WorkloadGroupMgr();
|
||||
_workload_group_manager->init_internal_workload_group();
|
||||
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
|
||||
_fragment_mgr = new FragmentMgr(this);
|
||||
_result_cache = new ResultCache(config::query_cache_max_size_mb,
|
||||
@ -295,7 +296,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
|
||||
return st;
|
||||
}
|
||||
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
|
||||
if (st = _storage_engine->start_bg_threads(); !st.ok()) {
|
||||
WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
|
||||
if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
|
||||
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st;
|
||||
return st;
|
||||
}
|
||||
|
||||
@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
|
||||
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
|
||||
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
|
||||
|
||||
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
|
||||
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {}
|
||||
|
||||
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool)
|
||||
: _id(tg_info.id),
|
||||
_name(tg_info.name),
|
||||
_version(tg_info.version),
|
||||
@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
|
||||
_spill_low_watermark(tg_info.spill_low_watermark),
|
||||
_spill_high_watermark(tg_info.spill_high_watermark),
|
||||
_scan_bytes_per_second(tg_info.read_bytes_per_second),
|
||||
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
|
||||
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
|
||||
_need_create_query_thread_pool(need_create_query_thread_pool) {
|
||||
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
|
||||
for (const auto& data_dir : data_dir_list) {
|
||||
_scan_io_throttle_map[data_dir.path] =
|
||||
@ -419,35 +422,42 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) {
|
||||
uint64_t tg_id = tg_info->id;
|
||||
std::string tg_name = tg_info->name;
|
||||
int cpu_hard_limit = tg_info->cpu_hard_limit;
|
||||
uint64_t cpu_shares = tg_info->cpu_share;
|
||||
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
|
||||
int scan_thread_num = tg_info->scan_thread_num;
|
||||
int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
|
||||
int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
|
||||
std::weak_ptr<CgroupCpuCtl> WorkloadGroup::get_cgroup_cpu_ctl_wptr() {
|
||||
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
|
||||
return _cgroup_cpu_ctl;
|
||||
}
|
||||
|
||||
void WorkloadGroup::create_cgroup_cpu_ctl() {
|
||||
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
|
||||
create_cgroup_cpu_ctl_no_lock();
|
||||
}
|
||||
|
||||
void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
|
||||
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
|
||||
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
|
||||
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(_id);
|
||||
if (cgroup_cpu_ctl) {
|
||||
Status ret = cgroup_cpu_ctl->init();
|
||||
if (ret.ok()) {
|
||||
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
|
||||
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
|
||||
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << _id;
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id
|
||||
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << _id
|
||||
<< ", reason=" << ret.to_string();
|
||||
}
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed";
|
||||
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl wg_id=" << _id << " failed";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
|
||||
|
||||
void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
|
||||
std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr,
|
||||
ExecEnv* exec_env) {
|
||||
uint64_t wg_id = wg_info->id;
|
||||
std::string wg_name = wg_info->name;
|
||||
int scan_thread_num = wg_info->scan_thread_num;
|
||||
int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
|
||||
int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
|
||||
if (_task_sched == nullptr) {
|
||||
int32_t executors_size = config::pipeline_executor_size;
|
||||
if (executors_size <= 0) {
|
||||
@ -457,18 +467,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
|
||||
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
|
||||
std::make_unique<pipeline::TaskScheduler>(
|
||||
exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue),
|
||||
"Pipe_" + tg_name, cg_cpu_ctl_ptr);
|
||||
"Pipe_" + wg_name, cg_cpu_ctl_ptr);
|
||||
Status ret = pipeline_task_scheduler->start();
|
||||
if (ret.ok()) {
|
||||
_task_sched = std::move(pipeline_task_scheduler);
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id;
|
||||
LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << wg_id;
|
||||
}
|
||||
}
|
||||
|
||||
if (_scan_task_sched == nullptr) {
|
||||
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
|
||||
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + tg_name,
|
||||
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name,
|
||||
cg_cpu_ctl_ptr);
|
||||
Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
|
||||
config::doris_scanner_thread_pool_thread_num,
|
||||
@ -476,34 +486,33 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
|
||||
if (ret.ok()) {
|
||||
_scan_task_sched = std::move(scan_scheduler);
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id;
|
||||
LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << wg_id;
|
||||
}
|
||||
}
|
||||
if (scan_thread_num > 0 && _scan_task_sched) {
|
||||
_scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
|
||||
}
|
||||
|
||||
if (_remote_scan_task_sched == nullptr) {
|
||||
int remote_max_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num();
|
||||
int remote_scan_thread_queue_size =
|
||||
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
|
||||
std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler =
|
||||
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name,
|
||||
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name,
|
||||
cg_cpu_ctl_ptr);
|
||||
Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num,
|
||||
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
|
||||
config::doris_scanner_min_thread_pool_thread_num,
|
||||
remote_scan_thread_queue_size);
|
||||
if (ret.ok()) {
|
||||
_remote_scan_task_sched = std::move(remote_scan_scheduler);
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid="
|
||||
<< tg_id;
|
||||
<< wg_id;
|
||||
}
|
||||
}
|
||||
if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) {
|
||||
_remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
|
||||
min_remote_scan_thread_num);
|
||||
}
|
||||
|
||||
if (_memtable_flush_pool == nullptr) {
|
||||
int num_disk = ExecEnv::GetInstance()->get_storage_engine()->get_disk_num();
|
||||
// -1 means disk num may not be inited, so not create flush pool
|
||||
@ -512,7 +521,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
|
||||
|
||||
size_t min_threads = std::max(1, config::wg_flush_thread_num_per_store);
|
||||
size_t max_threads = num_disk * min_threads;
|
||||
std::string pool_name = "wg_flush_" + tg_name;
|
||||
std::string pool_name = "wg_flush_" + wg_name;
|
||||
auto ret = ThreadPoolBuilder(pool_name)
|
||||
.set_min_threads(min_threads)
|
||||
.set_max_threads(max_threads)
|
||||
@ -520,17 +529,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
|
||||
.build(&thread_pool);
|
||||
if (!ret.ok()) {
|
||||
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid="
|
||||
<< tg_id;
|
||||
<< wg_id;
|
||||
} else {
|
||||
_memtable_flush_pool = std::move(thread_pool);
|
||||
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id
|
||||
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << wg_id
|
||||
<< ", max thread num=" << max_threads
|
||||
<< ", min thread num=" << min_threads;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
|
||||
uint64_t wg_id = wg_info->id;
|
||||
int cpu_hard_limit = wg_info->cpu_hard_limit;
|
||||
uint64_t cpu_shares = wg_info->cpu_share;
|
||||
bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
|
||||
create_cgroup_cpu_ctl_no_lock();
|
||||
|
||||
// step 6: update cgroup cpu if needed
|
||||
if (_cgroup_cpu_ctl) {
|
||||
if (enable_cpu_hard_limit) {
|
||||
if (cpu_hard_limit > 0) {
|
||||
@ -538,16 +554,26 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
|
||||
_cgroup_cpu_ctl->update_cpu_soft_limit(
|
||||
CgroupCpuCtl::cpu_soft_limit_default_value());
|
||||
} else {
|
||||
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
|
||||
<< cpu_hard_limit << ", gid=" << tg_id;
|
||||
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is "
|
||||
"illegal: "
|
||||
<< cpu_hard_limit << ", gid=" << wg_id;
|
||||
}
|
||||
} else {
|
||||
_cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
|
||||
_cgroup_cpu_ctl->update_cpu_hard_limit(
|
||||
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
|
||||
}
|
||||
_cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
|
||||
&(tg_info->cgroup_cpu_hard_limit));
|
||||
_cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
|
||||
&(wg_info->cgroup_cpu_hard_limit));
|
||||
}
|
||||
}
|
||||
|
||||
void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info, ExecEnv* exec_env) {
|
||||
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
|
||||
upsert_cgroup_cpu_ctl_no_lock(wg_info);
|
||||
|
||||
if (_need_create_query_thread_pool) {
|
||||
upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl, exec_env);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -59,6 +59,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
|
||||
public:
|
||||
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
|
||||
|
||||
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool);
|
||||
|
||||
int64_t version() const { return _version; }
|
||||
|
||||
uint64_t cpu_share() const { return _cpu_share.load(); }
|
||||
@ -210,7 +212,17 @@ public:
|
||||
}
|
||||
int64_t get_remote_scan_bytes_per_second();
|
||||
|
||||
void create_cgroup_cpu_ctl();
|
||||
|
||||
std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
|
||||
|
||||
private:
|
||||
void create_cgroup_cpu_ctl_no_lock();
|
||||
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
|
||||
void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
|
||||
std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr,
|
||||
ExecEnv* exec_env);
|
||||
|
||||
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
|
||||
const uint64_t _id;
|
||||
std::string _name;
|
||||
@ -241,7 +253,10 @@ private:
|
||||
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
|
||||
|
||||
std::shared_mutex _task_sched_lock;
|
||||
std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
|
||||
// _cgroup_cpu_ctl not only used by threadpool which managed by WorkloadGroup,
|
||||
// but also some global background threadpool which not owned by WorkloadGroup,
|
||||
// so it should be shared ptr;
|
||||
std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
|
||||
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
|
||||
std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr};
|
||||
std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr};
|
||||
@ -250,6 +265,9 @@ private:
|
||||
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
|
||||
std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
|
||||
|
||||
// for some background workload, it doesn't need to create query thread pool
|
||||
const bool _need_create_query_thread_pool;
|
||||
|
||||
// bvar metric
|
||||
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
|
||||
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
|
||||
|
||||
@ -33,6 +33,25 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
void WorkloadGroupMgr::init_internal_workload_group() {
|
||||
WorkloadGroupPtr internal_wg = nullptr;
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
|
||||
if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == _workload_groups.end()) {
|
||||
WorkloadGroupInfo internal_wg_info {
|
||||
.id = INTERNAL_WORKLOAD_GROUP_ID,
|
||||
.name = INTERNAL_WORKLOAD_GROUP_NAME,
|
||||
.cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
|
||||
internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info, false);
|
||||
_workload_groups[internal_wg_info.id] = internal_wg;
|
||||
}
|
||||
}
|
||||
DCHECK(internal_wg != nullptr);
|
||||
if (internal_wg) {
|
||||
internal_wg->create_cgroup_cpu_ctl();
|
||||
}
|
||||
}
|
||||
|
||||
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
|
||||
const WorkloadGroupInfo& workload_group_info) {
|
||||
{
|
||||
@ -85,6 +104,10 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
|
||||
old_wg_size = _workload_groups.size();
|
||||
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
|
||||
uint64_t wg_id = iter->first;
|
||||
// internal workload group created by BE can not be dropped
|
||||
if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
|
||||
continue;
|
||||
}
|
||||
auto workload_group_ptr = iter->second;
|
||||
if (used_wg_id.find(wg_id) == used_wg_id.end()) {
|
||||
workload_group_ptr->shutdown();
|
||||
|
||||
@ -36,11 +36,18 @@ class TaskScheduler;
|
||||
class MultiCoreTaskQueue;
|
||||
} // namespace pipeline
|
||||
|
||||
// internal_group is used for doris internal workload, currently is mainly compaction
|
||||
const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
|
||||
static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
|
||||
const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
|
||||
|
||||
class WorkloadGroupMgr {
|
||||
public:
|
||||
WorkloadGroupMgr() = default;
|
||||
~WorkloadGroupMgr() = default;
|
||||
|
||||
void init_internal_workload_group();
|
||||
|
||||
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info);
|
||||
|
||||
void get_related_workload_groups(const std::function<bool(const WorkloadGroupPtr& ptr)>& pred,
|
||||
@ -62,6 +69,11 @@ public:
|
||||
|
||||
void get_wg_resource_usage(vectorized::Block* block);
|
||||
|
||||
WorkloadGroupPtr get_internal_wg() {
|
||||
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
|
||||
return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_mutex _group_mutex;
|
||||
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
|
||||
|
||||
@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) {
|
||||
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
|
||||
std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
|
||||
_cgroup_cpu_ctl = cgroup_cpu_ctl;
|
||||
return *this;
|
||||
}
|
||||
@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() {
|
||||
_num_threads++;
|
||||
_num_threads_pending_start--;
|
||||
|
||||
if (_cgroup_cpu_ctl != nullptr) {
|
||||
static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
|
||||
if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
|
||||
static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
|
||||
}
|
||||
|
||||
// Owned by this worker thread and added/removed from _idle_threads as needed.
|
||||
|
||||
@ -107,7 +107,7 @@ public:
|
||||
ThreadPoolBuilder& set_min_threads(int min_threads);
|
||||
ThreadPoolBuilder& set_max_threads(int max_threads);
|
||||
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
|
||||
ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl);
|
||||
ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl);
|
||||
template <class Rep, class Period>
|
||||
ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
|
||||
_idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
|
||||
@ -133,7 +133,7 @@ private:
|
||||
int _min_threads;
|
||||
int _max_threads;
|
||||
int _max_queue_size;
|
||||
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
|
||||
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
|
||||
std::chrono::milliseconds _idle_timeout;
|
||||
|
||||
ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
|
||||
@ -345,7 +345,7 @@ private:
|
||||
// Protected by _lock.
|
||||
int _total_queued_tasks;
|
||||
|
||||
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
|
||||
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
|
||||
|
||||
// All allocated tokens.
|
||||
//
|
||||
|
||||
@ -114,11 +114,8 @@ struct SimplifiedScanTask {
|
||||
|
||||
class SimplifiedScanScheduler {
|
||||
public:
|
||||
SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) {
|
||||
_is_stop.store(false);
|
||||
_cgroup_cpu_ctl = cgroup_cpu_ctl;
|
||||
_sched_name = sched_name;
|
||||
}
|
||||
SimplifiedScanScheduler(std::string sched_name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
|
||||
: _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {}
|
||||
|
||||
~SimplifiedScanScheduler() {
|
||||
stop();
|
||||
@ -217,7 +214,7 @@ public:
|
||||
private:
|
||||
std::unique_ptr<ThreadPool> _scan_thread_pool;
|
||||
std::atomic<bool> _is_stop;
|
||||
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
|
||||
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
|
||||
std::string _sched_name;
|
||||
};
|
||||
|
||||
|
||||
@ -113,6 +113,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
|
||||
force_close(status);
|
||||
}
|
||||
|
||||
if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) {
|
||||
if (auto cg_ctl_sptr =
|
||||
state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
|
||||
Status ret = cg_ctl_sptr->add_thread_to_cgroup();
|
||||
if (ret.ok()) {
|
||||
std::string wg_tname =
|
||||
"asyc_wr_" + state->get_query_ctx()->workload_group()->name();
|
||||
Thread::set_self_name(wg_tname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_writer_status.ok()) {
|
||||
while (true) {
|
||||
ThreadCpuStopWatch cpu_time_stop_watch;
|
||||
|
||||
@ -25,6 +25,10 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -55,14 +59,26 @@ public class AlterWorkloadGroupStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
if (properties == null || properties.isEmpty()) {
|
||||
throw new AnalysisException("Resource group properties can't be null");
|
||||
throw new AnalysisException("Workload Group properties can't be empty");
|
||||
}
|
||||
|
||||
if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
|
||||
throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified ");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(WorkloadGroup.TAG);
|
||||
if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
|
||||
|| WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
|
||||
throw new AnalysisException(
|
||||
WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
+ " group can not set tag");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ALTER RESOURCE GROUP '").append(workloadGroupName).append("' ");
|
||||
sb.append("ALTER WORKLOAD GROUP '").append(workloadGroupName).append("' ");
|
||||
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@ -27,6 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -68,12 +71,19 @@ public class CreateWorkloadGroupStmt extends DdlStmt {
|
||||
FeNameFormat.checkWorkloadGroupName(workloadGroupName);
|
||||
|
||||
if (properties == null || properties.isEmpty()) {
|
||||
throw new AnalysisException("Resource group properties can't be null");
|
||||
throw new AnalysisException("Workload Group properties can't be empty");
|
||||
}
|
||||
|
||||
String wgTag = properties.get(WorkloadGroup.TAG);
|
||||
if (wgTag != null) {
|
||||
FeNameFormat.checkCommonName("workload group tag", wgTag);
|
||||
if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
|
||||
throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified ");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(WorkloadGroup.TAG);
|
||||
if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
|
||||
|| WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
|
||||
throw new AnalysisException(
|
||||
WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
+ " group can not set tag");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.analysis;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -50,8 +49,6 @@ public class DropWorkloadGroupStmt extends DdlStmt {
|
||||
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
|
||||
FeNameFormat.checkWorkloadGroupName(workloadGroupName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -246,6 +246,7 @@ import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.resource.AdmissionControl;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
|
||||
@ -1751,6 +1752,7 @@ public class Env {
|
||||
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
|
||||
topicPublisherThread.addToTopicPublisherList(wpPublisher);
|
||||
topicPublisherThread.start();
|
||||
new CreateInternalWorkloadGroupThread().start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -50,6 +50,9 @@ public class FeConstants {
|
||||
// set to false to disable internal schema db
|
||||
public static boolean enableInternalSchemaDb = true;
|
||||
|
||||
// for UT, create internal workload group thread can not start
|
||||
public static boolean shouldCreateInternalWorkloadGroup = true;
|
||||
|
||||
// default scheduler interval is 10 seconds
|
||||
public static int default_scheduler_interval_millisecond = 10000;
|
||||
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class CreateInternalWorkloadGroupThread extends Thread {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
|
||||
|
||||
public CreateInternalWorkloadGroupThread() {
|
||||
super("CreateInternalWorkloadGroupThread");
|
||||
}
|
||||
|
||||
public void run() {
|
||||
if (!FeConstants.shouldCreateInternalWorkloadGroup) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Env env = Env.getCurrentEnv();
|
||||
while (!env.isReady()) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
if (!env.getWorkloadGroupMgr()
|
||||
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
|
||||
env.getWorkloadGroupMgr().createInternalWorkloadGroup();
|
||||
LOG.info("create internal workload group succ");
|
||||
} else {
|
||||
LOG.info("internal workload group already exists.");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("create internal workload group failed. ", t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TWorkloadGroupInfo;
|
||||
import org.apache.doris.thrift.TWorkloadType;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -43,8 +45,11 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class);
|
||||
@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second";
|
||||
|
||||
// it's used to define Doris's internal workload group,
|
||||
// currently it is internal, only contains compaction
|
||||
// later more type and workload may be included in the future.
|
||||
public static final String INTERNAL_TYPE = "internal_type";
|
||||
|
||||
// NOTE(wb): all property is not required, some properties default value is set in be
|
||||
// default value is as followed
|
||||
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
|
||||
@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
|
||||
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
|
||||
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
|
||||
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
|
||||
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
|
||||
|
||||
public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new ImmutableMap.Builder<String, Integer>()
|
||||
.put(TWorkloadType.INTERNAL.toString().toLowerCase(), TWorkloadType.INTERNAL.getValue()).build();
|
||||
|
||||
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
|
||||
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
|
||||
@ -386,18 +399,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
+ SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(TAG);
|
||||
if (!StringUtils.isEmpty(tagStr)) {
|
||||
String[] tagArr = tagStr.split(",");
|
||||
for (String tag : tagArr) {
|
||||
try {
|
||||
FeNameFormat.checkCommonName("workload group tag name", tag);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException("workload group tag name format is illegal, " + tagStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.containsKey(READ_BYTES_PER_SECOND)) {
|
||||
String readBytesVal = properties.get(READ_BYTES_PER_SECOND);
|
||||
try {
|
||||
@ -427,6 +428,37 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
String tagStr = properties.get(TAG);
|
||||
if (!StringUtils.isEmpty(tagStr)) {
|
||||
String[] tagArr = tagStr.split(",");
|
||||
for (String tag : tagArr) {
|
||||
try {
|
||||
FeNameFormat.checkCommonName("workload group tag", tag);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException("tag format is illegal, " + tagStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// internal workload group is usually created by Doris.
|
||||
// If exception happens here, it means thrift not match WORKLOAD_TYPE_MAP.
|
||||
String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
|
||||
if (!StringUtils.isEmpty(interTypeId)) {
|
||||
int wid = Integer.valueOf(interTypeId);
|
||||
if (TWorkloadType.findByValue(wid) == null) {
|
||||
throw new DdlException("error internal type id: " + wid + ", current id map:" + WORKLOAD_TYPE_MAP);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Optional<Integer> getInternalTypeId() {
|
||||
String typeIdStr = this.properties.get(INTERNAL_TYPE);
|
||||
if (StringUtils.isEmpty(typeIdStr)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(Integer.valueOf(typeIdStr));
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@ -535,8 +567,18 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
return cpuHardLimit;
|
||||
}
|
||||
|
||||
public String getTag() {
|
||||
return properties.get(TAG);
|
||||
public Optional<Set<String>> getTag() {
|
||||
String tagStr = properties.get(TAG);
|
||||
if (StringUtils.isEmpty(tagStr)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
Set<String> tagSet = new HashSet<>();
|
||||
String[] ss = tagStr.split(",");
|
||||
for (String str : ss) {
|
||||
tagSet.add(str);
|
||||
}
|
||||
return Optional.of(tagSet);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -550,7 +592,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public TopicInfo toTopicInfo() {
|
||||
TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
|
||||
tWorkloadGroupInfo.setId(id);
|
||||
|
||||
long wgId = this.id;
|
||||
Optional<Integer> internalTypeId = getInternalTypeId();
|
||||
if (internalTypeId.isPresent()) {
|
||||
wgId = internalTypeId.get();
|
||||
}
|
||||
tWorkloadGroupInfo.setId(wgId);
|
||||
|
||||
tWorkloadGroupInfo.setName(name);
|
||||
tWorkloadGroupInfo.setVersion(version);
|
||||
|
||||
|
||||
@ -42,6 +42,7 @@ import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TUserIdentity;
|
||||
import org.apache.doris.thrift.TWorkloadType;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
@ -49,7 +50,6 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -62,6 +62,7 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
public static final Long DEFAULT_GROUP_ID = 1L;
|
||||
|
||||
public static final String INTERNAL_GROUP_NAME = "_internal";
|
||||
|
||||
// internal_type_id could be converted to workload group id when Workload published to BE
|
||||
// refer WorkloadGroup.toTopicInfo
|
||||
public static final Long INTERNAL_TYPE_ID = Long.valueOf(TWorkloadType.INTERNAL.getValue());
|
||||
|
||||
public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
|
||||
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
|
||||
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
|
||||
@ -367,44 +374,84 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
LOG.info("Create workload group success: {}", workloadGroup);
|
||||
}
|
||||
|
||||
public void createInternalWorkloadGroup() {
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
// 100 is cgroup v2 default cpu_share value
|
||||
properties.put(WorkloadGroup.CPU_SHARE, "100");
|
||||
properties.put(WorkloadGroup.INTERNAL_TYPE, String.valueOf(INTERNAL_TYPE_ID));
|
||||
WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), INTERNAL_GROUP_NAME, properties);
|
||||
writeLock();
|
||||
try {
|
||||
if (!nameToWorkloadGroup.containsKey(wg.getName())) {
|
||||
nameToWorkloadGroup.put(wg.getName(), wg);
|
||||
idToWorkloadGroup.put(wg.getId(), wg);
|
||||
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit
|
||||
// when create/alter workload group with same tag.
|
||||
// when oldWg is null it means caller is an alter stmt.
|
||||
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException {
|
||||
String wgTag = newWg.getTag();
|
||||
double sumOfAllMemLimit = 0;
|
||||
int sumOfAllCpuHardLimit = 0;
|
||||
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
|
||||
WorkloadGroup wg = entry.getValue();
|
||||
if (!StringUtils.equals(wgTag, wg.getTag())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (oldWg != null && entry.getKey() == oldWg.getId()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (wg.getCpuHardLimit() > 0) {
|
||||
sumOfAllCpuHardLimit += wg.getCpuHardLimit();
|
||||
}
|
||||
if (wg.getMemoryLimitPercent() > 0) {
|
||||
sumOfAllMemLimit += wg.getMemoryLimitPercent();
|
||||
}
|
||||
Optional<Set<String>> newWgTag = newWg.getTag();
|
||||
Set<String> newWgTagSet = null;
|
||||
if (newWgTag.isPresent()) {
|
||||
newWgTagSet = newWgTag.get();
|
||||
} else {
|
||||
newWgTagSet = new HashSet<>();
|
||||
newWgTagSet.add(null);
|
||||
}
|
||||
|
||||
sumOfAllMemLimit += newWg.getMemoryLimitPercent();
|
||||
sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
|
||||
for (String newWgOneTag : newWgTagSet) {
|
||||
double sumOfAllMemLimit = 0;
|
||||
int sumOfAllCpuHardLimit = 0;
|
||||
|
||||
if (sumOfAllMemLimit > 100.0 + 1e-6) {
|
||||
throw new DdlException(
|
||||
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
|
||||
+ " cannot be greater than 100.0%.");
|
||||
}
|
||||
// 1 get sum value of all wg which has same tag without current wg
|
||||
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
|
||||
WorkloadGroup wg = entry.getValue();
|
||||
Optional<Set<String>> wgTag = wg.getTag();
|
||||
|
||||
if (sumOfAllCpuHardLimit > 100) {
|
||||
throw new DdlException(
|
||||
"sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag "
|
||||
+ wgTag + " can not be greater than 100% ");
|
||||
if (oldWg != null && entry.getKey() == oldWg.getId()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (newWgOneTag == null) {
|
||||
if (wgTag.isPresent()) {
|
||||
continue;
|
||||
}
|
||||
} else if (!wgTag.isPresent() || (!wgTag.get().contains(newWgOneTag))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (wg.getCpuHardLimit() > 0) {
|
||||
sumOfAllCpuHardLimit += wg.getCpuHardLimit();
|
||||
}
|
||||
if (wg.getMemoryLimitPercent() > 0) {
|
||||
sumOfAllMemLimit += wg.getMemoryLimitPercent();
|
||||
}
|
||||
}
|
||||
|
||||
// 2 sum current wg value
|
||||
sumOfAllMemLimit += newWg.getMemoryLimitPercent();
|
||||
sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
|
||||
|
||||
// 3 check total sum
|
||||
if (sumOfAllMemLimit > 100.0 + 1e-6) {
|
||||
throw new DdlException(
|
||||
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + (
|
||||
newWgTag.isPresent() ? newWgTag.get() : "")
|
||||
+ " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit);
|
||||
}
|
||||
|
||||
if (sumOfAllCpuHardLimit > 100) {
|
||||
throw new DdlException(
|
||||
"sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + (
|
||||
newWgTag.isPresent()
|
||||
? newWgTag.get() : "") + " can not be greater than 100% ");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -438,8 +485,8 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
|
||||
String workloadGroupName = stmt.getWorkloadGroupName();
|
||||
if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
|
||||
throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed");
|
||||
if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
|
||||
throw new DdlException("Dropping workload group " + workloadGroupName + " is not allowed");
|
||||
}
|
||||
|
||||
// if a workload group exists in user property, it should not be dropped
|
||||
|
||||
@ -235,4 +235,226 @@ public class WorkloadGroupMgrTest {
|
||||
}
|
||||
Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiTagCreateWorkloadGroup() throws UserException {
|
||||
Config.enable_workload_group = true;
|
||||
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
|
||||
|
||||
{
|
||||
String name = "empty_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
|
||||
properties.put(WorkloadGroup.TAG, "");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "empty_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn1,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn3,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
String name = "not_empty_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
properties.put(WorkloadGroup.TAG, "cn2,cn100");
|
||||
try {
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
} catch (DdlException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
properties.put(WorkloadGroup.TAG, "cn3,cn100");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g5";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
properties.put(WorkloadGroup.TAG, "cn5");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g6";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn5");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g7";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
properties.put(WorkloadGroup.TAG, "cn5");
|
||||
try {
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
} catch (DdlException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMultiTagAlterWorkloadGroup() throws UserException {
|
||||
Config.enable_workload_group = true;
|
||||
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
|
||||
{
|
||||
String name = "empty_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
|
||||
properties.put(WorkloadGroup.TAG, "");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "empty_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn1,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn3,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn2,cn100");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
properties.put(WorkloadGroup.TAG, "cn2,cn100");
|
||||
AlterWorkloadGroupStmt alterStmt = new AlterWorkloadGroupStmt(name, properties);
|
||||
try {
|
||||
workloadGroupMgr.alterWorkloadGroup(alterStmt);
|
||||
} catch (DdlException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMultiTagCreateWorkloadGroupWithNoTag() throws UserException {
|
||||
Config.enable_workload_group = true;
|
||||
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
|
||||
|
||||
{
|
||||
String name = "not_empty_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn1,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "not_empty_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
properties.put(WorkloadGroup.TAG, "cn3,cn2");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
// create not tag workload group
|
||||
{
|
||||
String name = "no_tag_g1";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
|
||||
properties.put(WorkloadGroup.TAG, "");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "no_tag_g2";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
|
||||
{
|
||||
String name = "no_tag_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
try {
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
} catch (DdlException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
String name = "no_tag_g3";
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
|
||||
CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties);
|
||||
workloadGroupMgr.createWorkloadGroup(createStmt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,6 +153,7 @@ public abstract class TestWithFeService {
|
||||
@BeforeAll
|
||||
public final void beforeAll() throws Exception {
|
||||
FeConstants.enableInternalSchemaDb = false;
|
||||
FeConstants.shouldCreateInternalWorkloadGroup = false;
|
||||
beforeCreatingConnectContext();
|
||||
connectContext = createDefaultCtx();
|
||||
beforeCluster();
|
||||
|
||||
@ -243,6 +243,9 @@ struct TPublishTopicResult {
|
||||
1: required Status.TStatus status
|
||||
}
|
||||
|
||||
enum TWorkloadType {
|
||||
INTERNAL = 2
|
||||
}
|
||||
|
||||
service BackendService {
|
||||
// Called by coord to start asynchronous execution of plan fragment in backend.
|
||||
|
||||
@ -168,6 +168,30 @@ suite("test_crud_wlg") {
|
||||
exception "can not be greater than 100%"
|
||||
}
|
||||
|
||||
// test alter tag and type
|
||||
test {
|
||||
sql "alter workload group test_group properties ( 'internal_type'='13' );"
|
||||
|
||||
exception "internal_type can not be create or modified"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "create workload group inter_wg properties('internal_type'='123');"
|
||||
exception "internal_type can not be create or modified"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "alter workload group normal properties ('tag'='123')"
|
||||
|
||||
exception "_internal and normal group can not set tag"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "alter workload group _internal properties ('tag'='123')"
|
||||
|
||||
exception "_internal and normal group can not set tag"
|
||||
}
|
||||
|
||||
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );"
|
||||
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
|
||||
qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;"
|
||||
@ -475,6 +499,11 @@ suite("test_crud_wlg") {
|
||||
|
||||
|
||||
// test workload group's tag property, cpu_hard_limit
|
||||
test {
|
||||
sql "create workload group tag_test properties('tag'=' a, b , c ');"
|
||||
exception "tag format is illegal"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='101%', 'tag'='tag1')"
|
||||
exception "must be a positive integer"
|
||||
|
||||
Reference in New Issue
Block a user