add workload scheduler in be (#29116)

This commit is contained in:
wangbo
2023-12-28 15:04:22 +08:00
committed by GitHub
parent 118775f913
commit 31b3be456c
25 changed files with 1027 additions and 6 deletions

View File

@ -230,6 +230,7 @@ import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
@ -998,6 +999,8 @@ public class Env {
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
workloadGroupMgr.startUpdateThread();

View File

@ -59,9 +59,6 @@ public class TopicPublisherThread extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
if (!Config.enable_workload_group) {
return;
}
LOG.info("begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
@ -69,6 +66,10 @@ public class TopicPublisherThread extends MasterDaemon {
topicPublisher.getTopicInfo(request);
}
if (request.getTopicMap().size() == 0) {
return;
}
// step 2: publish topic info to all be
Collection<Backend> nodesToPublish = clusterInfoService.getIdToBackend().values();
AckResponseHandler handler = new AckResponseHandler(nodesToPublish);

View File

@ -20,6 +20,9 @@ package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import java.util.List;
public class WorkloadGroupPublisher implements TopicPublisher {
@ -31,7 +34,9 @@ public class WorkloadGroupPublisher implements TopicPublisher {
@Override
public void getTopicInfo(TPublishTopicRequest req) {
req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
env.getWorkloadGroupMgr().getPublishTopicInfo());
List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
if (list.size() > 0) {
req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
}
}
}

View File

@ -22,8 +22,16 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TWorkloadAction;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadCondition;
import org.apache.doris.thrift.TWorkloadMetricType;
import org.apache.doris.thrift.TWorkloadSchedPolicy;
import org.apache.doris.thrift.TopicInfo;
import com.esotericsoftware.minlog.Log;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
@ -41,6 +49,23 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).build();
// used for convert fe type to thrift type
private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME).build();
private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)
.put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build();
private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP
= new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>()
.put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
.put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
.put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL)
.put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
.put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build();
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
@ -173,6 +198,48 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
return actionMetaList;
}
public TopicInfo toTopicInfo() {
TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy();
tPolicy.setId(id);
tPolicy.setName(name);
tPolicy.setVersion(version);
tPolicy.setPriority(priority);
tPolicy.setEnabled(enabled);
List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
TWorkloadCondition tCond = new TWorkloadCondition();
TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
if (metricType == null) {
return null;
}
tCond.setMetricName(metricType);
tCond.setOp(OP_MAP.get(cond.op));
tCond.setValue(cond.value);
condList.add(tCond);
}
List<TWorkloadAction> actionList = new ArrayList();
for (WorkloadActionMeta action : actionMetaList) {
TWorkloadAction tAction = new TWorkloadAction();
TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
if (tActionType == null) {
return null;
}
tAction.setAction(tActionType);
tAction.setActionArgs(action.actionArgs);
actionList.add(tAction);
}
tPolicy.setConditionList(condList);
tPolicy.setActionList(actionList);
TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadSchedPolicy(tPolicy);
return topicInfo;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -35,6 +35,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -412,6 +413,22 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
lock.writeLock().unlock();
}
public List<TopicInfo> getPublishTopicInfoList() {
List<TopicInfo> topicInfoList = new ArrayList();
readLock();
try {
for (Map.Entry<Long, WorkloadSchedPolicy> entry : idToPolicy.entrySet()) {
TopicInfo tInfo = entry.getValue().toTopicInfo();
if (tInfo != null) {
topicInfoList.add(tInfo);
}
}
} finally {
readUnlock();
}
return topicInfoList;
}
public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import java.util.List;
public class WorkloadSchedPolicyPublisher implements TopicPublisher {
private Env env;
public WorkloadSchedPolicyPublisher(Env env) {
this.env = env;
}
@Override
public void getTopicInfo(TPublishTopicRequest req) {
List<TopicInfo> list = env.getWorkloadSchedPolicyMgr().getPublishTopicInfoList();
if (list.size() > 0) {
req.putToTopicMap(TTopicInfoType.WORKLOAD_SCHED_POLICY, list);
}
}
}