[Fix]delete internal group (#46351)
This commit is contained in:
@ -223,7 +223,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
|
||||
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
|
||||
RETURN_IF_ERROR(init_pipeline_task_scheduler());
|
||||
_workload_group_manager = new WorkloadGroupMgr();
|
||||
_workload_group_manager->init_internal_workload_group();
|
||||
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
|
||||
_fragment_mgr = new FragmentMgr(this);
|
||||
_result_cache = new ResultCache(config::query_cache_max_size_mb,
|
||||
@ -297,8 +296,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
|
||||
return st;
|
||||
}
|
||||
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
|
||||
WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
|
||||
if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
|
||||
if (st = _storage_engine->start_bg_threads(nullptr); !st.ok()) {
|
||||
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st;
|
||||
return st;
|
||||
}
|
||||
|
||||
@ -34,25 +34,6 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
void WorkloadGroupMgr::init_internal_workload_group() {
|
||||
WorkloadGroupPtr internal_wg = nullptr;
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
|
||||
if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == _workload_groups.end()) {
|
||||
WorkloadGroupInfo internal_wg_info {
|
||||
.id = INTERNAL_WORKLOAD_GROUP_ID,
|
||||
.name = INTERNAL_WORKLOAD_GROUP_NAME,
|
||||
.cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
|
||||
internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info, false);
|
||||
_workload_groups[internal_wg_info.id] = internal_wg;
|
||||
}
|
||||
}
|
||||
DCHECK(internal_wg != nullptr);
|
||||
if (internal_wg) {
|
||||
internal_wg->create_cgroup_cpu_ctl();
|
||||
}
|
||||
}
|
||||
|
||||
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
|
||||
const WorkloadGroupInfo& workload_group_info) {
|
||||
{
|
||||
@ -105,10 +86,6 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
|
||||
old_wg_size = _workload_groups.size();
|
||||
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
|
||||
uint64_t wg_id = iter->first;
|
||||
// internal workload group created by BE can not be dropped
|
||||
if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
|
||||
continue;
|
||||
}
|
||||
auto workload_group_ptr = iter->second;
|
||||
if (used_wg_id.find(wg_id) == used_wg_id.end()) {
|
||||
workload_group_ptr->shutdown();
|
||||
|
||||
@ -36,18 +36,11 @@ class TaskScheduler;
|
||||
class MultiCoreTaskQueue;
|
||||
} // namespace pipeline
|
||||
|
||||
// internal_group is used for doris internal workload, currently is mainly compaction
|
||||
const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
|
||||
static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
|
||||
const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
|
||||
|
||||
class WorkloadGroupMgr {
|
||||
public:
|
||||
WorkloadGroupMgr() = default;
|
||||
~WorkloadGroupMgr() = default;
|
||||
|
||||
void init_internal_workload_group();
|
||||
|
||||
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info);
|
||||
|
||||
void get_related_workload_groups(const std::function<bool(const WorkloadGroupPtr& ptr)>& pred,
|
||||
@ -69,11 +62,6 @@ public:
|
||||
|
||||
void get_wg_resource_usage(vectorized::Block* block);
|
||||
|
||||
WorkloadGroupPtr get_internal_wg() {
|
||||
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
|
||||
return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
|
||||
}
|
||||
|
||||
void refresh_workload_group_metrics();
|
||||
|
||||
private:
|
||||
|
||||
@ -62,15 +62,10 @@ public class AlterWorkloadGroupStmt extends DdlStmt {
|
||||
throw new AnalysisException("Workload Group properties can't be empty");
|
||||
}
|
||||
|
||||
if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
|
||||
throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified ");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(WorkloadGroup.TAG);
|
||||
if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
|
||||
|| WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
|
||||
if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName))) {
|
||||
throw new AnalysisException(
|
||||
WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
+ " group can not set tag");
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,15 +74,11 @@ public class CreateWorkloadGroupStmt extends DdlStmt {
|
||||
throw new AnalysisException("Workload Group properties can't be empty");
|
||||
}
|
||||
|
||||
if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
|
||||
throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified ");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(WorkloadGroup.TAG);
|
||||
if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
|
||||
|| WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
|
||||
if (!StringUtils.isEmpty(tagStr)
|
||||
&& (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName))) {
|
||||
throw new AnalysisException(
|
||||
WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
WorkloadGroupMgr.DEFAULT_GROUP_NAME
|
||||
+ " group can not set tag");
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,7 +246,6 @@ import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.resource.AdmissionControl;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
|
||||
@ -1758,7 +1757,6 @@ public class Env {
|
||||
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
|
||||
topicPublisherThread.addToTopicPublisherList(wpPublisher);
|
||||
topicPublisherThread.start();
|
||||
new CreateInternalWorkloadGroupThread().start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1,55 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class CreateInternalWorkloadGroupThread extends Thread {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
|
||||
|
||||
public CreateInternalWorkloadGroupThread() {
|
||||
super("CreateInternalWorkloadGroupThread");
|
||||
}
|
||||
|
||||
public void run() {
|
||||
if (!FeConstants.shouldCreateInternalWorkloadGroup) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Env env = Env.getCurrentEnv();
|
||||
while (!env.isReady()) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
if (!env.getWorkloadGroupMgr()
|
||||
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
|
||||
env.getWorkloadGroupMgr().createInternalWorkloadGroup();
|
||||
LOG.info("create internal workload group succ");
|
||||
} else {
|
||||
LOG.info("internal workload group already exists.");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("create internal workload group failed. ", t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -30,10 +30,8 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TWorkloadGroupInfo;
|
||||
import org.apache.doris.thrift.TWorkloadType;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -85,11 +83,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second";
|
||||
|
||||
// it's used to define Doris's internal workload group,
|
||||
// currently it is internal, only contains compaction
|
||||
// later more type and workload may be included in the future.
|
||||
public static final String INTERNAL_TYPE = "internal_type";
|
||||
|
||||
// NOTE(wb): all property is not required, some properties default value is set in be
|
||||
// default value is as followed
|
||||
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
|
||||
@ -98,10 +91,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
|
||||
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
|
||||
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
|
||||
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
|
||||
|
||||
public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new ImmutableMap.Builder<String, Integer>()
|
||||
.put(TWorkloadType.INTERNAL.toString().toLowerCase(), TWorkloadType.INTERNAL.getValue()).build();
|
||||
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
|
||||
|
||||
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
|
||||
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
|
||||
@ -487,25 +477,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
// internal workload group is usually created by Doris.
|
||||
// If exception happens here, it means thrift not match WORKLOAD_TYPE_MAP.
|
||||
String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
|
||||
if (!StringUtils.isEmpty(interTypeId)) {
|
||||
int wid = Integer.valueOf(interTypeId);
|
||||
if (TWorkloadType.findByValue(wid) == null) {
|
||||
throw new DdlException("error internal type id: " + wid + ", current id map:" + WORKLOAD_TYPE_MAP);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Optional<Integer> getInternalTypeId() {
|
||||
String typeIdStr = this.properties.get(INTERNAL_TYPE);
|
||||
if (StringUtils.isEmpty(typeIdStr)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(Integer.valueOf(typeIdStr));
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@ -601,13 +572,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
public TopicInfo toTopicInfo() {
|
||||
TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
|
||||
|
||||
long wgId = this.id;
|
||||
Optional<Integer> internalTypeId = getInternalTypeId();
|
||||
if (internalTypeId.isPresent()) {
|
||||
wgId = internalTypeId.get();
|
||||
}
|
||||
tWorkloadGroupInfo.setId(wgId);
|
||||
tWorkloadGroupInfo.setId(this.id);
|
||||
|
||||
tWorkloadGroupInfo.setName(name);
|
||||
tWorkloadGroupInfo.setVersion(version);
|
||||
|
||||
@ -42,7 +42,6 @@ import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TUserIdentity;
|
||||
import org.apache.doris.thrift.TWorkloadType;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
@ -72,12 +71,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
public static final Long DEFAULT_GROUP_ID = 1L;
|
||||
|
||||
public static final String INTERNAL_GROUP_NAME = "_internal";
|
||||
|
||||
// internal_type_id could be converted to workload group id when Workload published to BE
|
||||
// refer WorkloadGroup.toTopicInfo
|
||||
public static final Long INTERNAL_TYPE_ID = Long.valueOf(TWorkloadType.INTERNAL.getValue());
|
||||
|
||||
public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
|
||||
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
|
||||
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
|
||||
@ -374,24 +367,6 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
LOG.info("Create workload group success: {}", workloadGroup);
|
||||
}
|
||||
|
||||
public void createInternalWorkloadGroup() {
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
// 100 is cgroup v2 default cpu_share value
|
||||
properties.put(WorkloadGroup.CPU_SHARE, "100");
|
||||
properties.put(WorkloadGroup.INTERNAL_TYPE, String.valueOf(INTERNAL_TYPE_ID));
|
||||
WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), INTERNAL_GROUP_NAME, properties);
|
||||
writeLock();
|
||||
try {
|
||||
if (!nameToWorkloadGroup.containsKey(wg.getName())) {
|
||||
nameToWorkloadGroup.put(wg.getName(), wg);
|
||||
idToWorkloadGroup.put(wg.getId(), wg);
|
||||
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit
|
||||
// when create/alter workload group with same tag.
|
||||
// when oldWg is null it means caller is an alter stmt.
|
||||
@ -485,7 +460,7 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
|
||||
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException {
|
||||
String workloadGroupName = stmt.getWorkloadGroupName();
|
||||
if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
|
||||
if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
|
||||
throw new DdlException("Dropping workload group " + workloadGroupName + " is not allowed");
|
||||
}
|
||||
|
||||
|
||||
@ -243,9 +243,6 @@ struct TPublishTopicResult {
|
||||
1: required Status.TStatus status
|
||||
}
|
||||
|
||||
enum TWorkloadType {
|
||||
INTERNAL = 2
|
||||
}
|
||||
|
||||
service BackendService {
|
||||
// Called by coord to start asynchronous execution of plan fragment in backend.
|
||||
|
||||
@ -169,30 +169,6 @@ suite("test_crud_wlg") {
|
||||
exception "can not be greater than 100%"
|
||||
}
|
||||
|
||||
// test alter tag and type
|
||||
test {
|
||||
sql "alter workload group test_group properties ( 'internal_type'='13' );"
|
||||
|
||||
exception "internal_type can not be create or modified"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "create workload group inter_wg properties('internal_type'='123');"
|
||||
exception "internal_type can not be create or modified"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "alter workload group normal properties ('tag'='123')"
|
||||
|
||||
exception "_internal and normal group can not set tag"
|
||||
}
|
||||
|
||||
test {
|
||||
sql "alter workload group _internal properties ('tag'='123')"
|
||||
|
||||
exception "_internal and normal group can not set tag"
|
||||
}
|
||||
|
||||
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );"
|
||||
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
|
||||
qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;"
|
||||
|
||||
Reference in New Issue
Block a user