[feature][executor]support workload schedule policy (#28443)

This commit is contained in:
wangbo
2023-12-19 18:00:02 +08:00
committed by GitHub
parent c72191eb9e
commit 71b7dcfb8f
38 changed files with 2150 additions and 3 deletions

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import java.util.Map;
public class AlterWorkloadSchedPolicyStmt extends DdlStmt {
private final String policyName;
private final Map<String, String> properties;
public AlterWorkloadSchedPolicyStmt(String policyName, Map<String, String> properties) {
this.policyName = policyName;
this.properties = properties;
}
public String getPolicyName() {
return policyName;
}
public Map<String, String> getProperties() {
return properties;
}
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
if (properties == null || properties.isEmpty()) {
throw new AnalysisException("properties can't be null when alter workload schedule policy");
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ALTER WORKLOAD SCHEDULE POLICY ");
sb.append(policyName);
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
return sb.toString();
}
}

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadschedpolicy.WorkloadActionMeta;
import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionMeta;
import java.util.List;
import java.util.Map;
public class CreateWorkloadSchedPolicyStmt extends DdlStmt {
private final boolean ifNotExists;
private final String policyName;
private final List<WorkloadConditionMeta> conditions;
private final List<WorkloadActionMeta> actions;
private final Map<String, String> properties;
public CreateWorkloadSchedPolicyStmt(boolean ifNotExists, String policyName,
List<WorkloadConditionMeta> conditions, List<WorkloadActionMeta> actions, Map<String, String> properties) {
this.ifNotExists = ifNotExists;
this.policyName = policyName;
this.conditions = conditions;
this.actions = actions;
this.properties = properties;
}
public boolean isIfNotExists() {
return ifNotExists;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
// check name
FeNameFormat.checkWorkloadSchedPolicyName(policyName);
if (conditions == null || conditions.size() < 1) {
throw new DdlException("At least one condition needs to be specified");
}
if (actions == null || actions.size() < 1) {
throw new DdlException("At least one action needs to be specified");
}
}
public String getPolicyName() {
return policyName;
}
public List<WorkloadConditionMeta> getConditions() {
return conditions;
}
public List<WorkloadActionMeta> getActions() {
return actions;
}
public Map<String, String> getProperties() {
return properties;
}
}

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
public class DropWorkloadSchedPolicyStmt extends DdlStmt {
private boolean ifExists;
private String policyName;
public DropWorkloadSchedPolicyStmt(boolean ifExists, String policyName) {
this.ifExists = ifExists;
this.policyName = policyName;
}
public boolean isIfExists() {
return ifExists;
}
public String getPolicyName() {
return policyName;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkWorkloadSchedPolicyName(policyName);
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("DROP ");
sb.append("WORKLOAD SCHEDULE POLICY '").append(policyName).append("' ");
return sb.toString();
}
}

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
public class ShowWorkloadSchedPolicyStmt extends ShowStmt {
public ShowWorkloadSchedPolicyStmt() {
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
}
@Override
public String toSql() {
return "SHOW WORKLOAD SCHEDULE POLICY";
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : WorkloadSchedPolicyMgr.WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(1000)));
}
return builder.build();
}
@Override
public RedirectStatus getRedirectStatus() {
if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
return RedirectStatus.FORWARD_NO_SYNC;
} else {
return RedirectStatus.NO_FORWARD;
}
}
}

View File

@ -111,6 +111,7 @@ import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadActionPublishThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DynamicPartitionUtil;
@ -227,6 +228,7 @@ import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
@ -484,6 +486,7 @@ public class Env {
private WorkloadGroupMgr workloadGroupMgr;
private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;
private QueryStats queryStats;
private StatisticsCleaner statisticsCleaner;
@ -505,6 +508,8 @@ public class Env {
private TopicPublisherThread topicPublisherThread;
private WorkloadActionPublishThread workloadActionPublisherThread;
private MTMVService mtmvService;
public List<TFrontendInfo> getFrontendInfos() {
@ -724,6 +729,7 @@ public class Env {
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
this.queryStats = new QueryStats();
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
@ -733,6 +739,8 @@ public class Env {
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
this.topicPublisherThread = new TopicPublisherThread(
"TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo);
this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher",
Config.workload_action_interval_ms, systemInfo);
this.mtmvService = new MTMVService();
}
@ -809,6 +817,10 @@ public class Env {
return workloadGroupMgr;
}
public WorkloadSchedPolicyMgr getWorkloadSchedPolicyMgr() {
return workloadSchedPolicyMgr;
}
// use this to get correct ClusterInfoService instance
public static SystemInfoService getCurrentSystemInfo() {
return getCurrentEnv().getClusterInfo();
@ -984,6 +996,8 @@ public class Env {
topicPublisherThread.start();
workloadGroupMgr.startUpdateThread();
workloadSchedPolicyMgr.start();
workloadActionPublisherThread.start();
}
// wait until FE is ready.
@ -2066,6 +2080,12 @@ public class Env {
return checksum;
}
public long loadWorkloadSchedPolicy(DataInputStream in, long checksum) throws IOException {
workloadSchedPolicyMgr = WorkloadSchedPolicyMgr.read(in);
LOG.info("finished replay workload sched policy from image");
return checksum;
}
public long loadSmallFiles(DataInputStream in, long checksum) throws IOException {
smallFileMgr.readFields(in);
LOG.info("finished replay smallFiles from image");
@ -2333,6 +2353,11 @@ public class Env {
return checksum;
}
public long saveWorkloadSchedPolicy(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getWorkloadSchedPolicyMgr().write(dos);
return checksum;
}
public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException {
smallFileMgr.write(dos);
return checksum;

View File

@ -134,6 +134,10 @@ public class FeNameFormat {
checkCommonName("workload group", workloadGroupName);
}
public static void checkWorkloadSchedPolicyName(String policyName) throws AnalysisException {
checkCommonName("workload schedule policy", policyName);
}
public static void checkCommonName(String type, String name) throws AnalysisException {
if (Strings.isNullOrEmpty(name) || !name.matches(getCommonNameRegex())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_NAME_FORMAT, type, name);

View File

@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class WorkloadActionPublishThread extends Daemon {
private ExecutorService executor = ThreadPoolManager
.newDaemonFixedThreadPool(4, 256, "workload-action-publish-thread", true);
private static final Logger LOG = LogManager.getLogger(WorkloadActionPublishThread.class);
public static Map<TTopicInfoType, List<TopicInfo>> workloadActionToplicInfoMap
= new HashMap<TTopicInfoType, List<TopicInfo>>();
public static synchronized void putWorkloadAction(TTopicInfoType type, TopicInfo topicInfo) {
List<TopicInfo> list = workloadActionToplicInfoMap.get(type);
if (list == null) {
list = new ArrayList<TopicInfo>();
workloadActionToplicInfoMap.put(type, list);
}
list.add(topicInfo);
}
public static synchronized Map<TTopicInfoType, List<TopicInfo>> getCurrentWorkloadActionMap() {
Map<TTopicInfoType, List<TopicInfo>> retMap = workloadActionToplicInfoMap;
workloadActionToplicInfoMap = new HashMap<TTopicInfoType, List<TopicInfo>>();
return retMap;
}
private SystemInfoService clusterInfoService;
public WorkloadActionPublishThread(String name, long intervalMs,
SystemInfoService clusterInfoService) {
super(name, intervalMs);
this.clusterInfoService = clusterInfoService;
}
@Override
protected final void runOneCycle() {
Map<TTopicInfoType, List<TopicInfo>> actionMap
= WorkloadActionPublishThread.getCurrentWorkloadActionMap();
if (actionMap.size() == 0) {
LOG.info("no workload action found, skip publish");
return;
}
Collection<Backend> currentBeToPublish = clusterInfoService.getIdToBackend().values();
AckResponseHandler handler = new AckResponseHandler(currentBeToPublish);
TPublishTopicRequest request = new TPublishTopicRequest();
request.setTopicMap(actionMap);
for (Backend be : currentBeToPublish) {
executor.submit(new WorkloadMoveActionTask(request, be, handler));
}
}
public class WorkloadMoveActionTask implements Runnable {
private TPublishTopicRequest request;
private Backend be;
private ResponseHandler handler;
public WorkloadMoveActionTask(TPublishTopicRequest request, Backend be,
ResponseHandler handler) {
this.request = request;
this.be = be;
this.handler = handler;
}
@Override
public void run() {
long beginTime = System.currentTimeMillis();
try {
TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort());
BackendService.Client client = ClientPool.backendPool.borrowObject(addr);
client.publishTopicInfo(request);
LOG.info("publish move action topic to be {} success, time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime));
} catch (Exception e) {
LOG.warn("publish move action topic to be {} error happens: , time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime), e);
} finally {
handler.onResponse(be);
}
}
}
}

View File

@ -85,6 +85,7 @@ import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.DropSqlBlockRuleOperationLog;
import org.apache.doris.persist.DropWorkloadGroupOperationLog;
import org.apache.doris.persist.DropWorkloadSchedPolicyOperatorLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.persist.LdapInfo;
@ -119,6 +120,7 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Backend;
@ -806,6 +808,17 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY:
case OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY: {
data = WorkloadSchedPolicy.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_WORKLOAD_SCHED_POLICY: {
data = DropWorkloadSchedPolicyOperatorLog.read(in);
isRead = true;
break;
}
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
data = AlterLightSchemaChangeInfo.read(in);
isRead = true;

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DropWorkloadSchedPolicyOperatorLog implements Writable {
@SerializedName(value = "id")
private long id;
public DropWorkloadSchedPolicyOperatorLog(long id) {
this.id = id;
}
public long getId() {
return id;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static DropWorkloadSchedPolicyOperatorLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), DropWorkloadSchedPolicyOperatorLog.class);
}
}

View File

@ -79,6 +79,7 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.TableStatsMeta;
@ -1048,6 +1049,22 @@ public class EditLog {
env.getWorkloadGroupMgr().replayAlterWorkloadGroup(resource);
break;
}
case OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY: {
final WorkloadSchedPolicy policy = (WorkloadSchedPolicy) journal.getData();
env.getWorkloadSchedPolicyMgr().replayCreateWorkloadSchedPolicy(policy);
break;
}
case OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY: {
final WorkloadSchedPolicy policy = (WorkloadSchedPolicy) journal.getData();
env.getWorkloadSchedPolicyMgr().replayAlterWorkloadSchedPolicy(policy);
break;
}
case OperationType.OP_DROP_WORKLOAD_SCHED_POLICY: {
final DropWorkloadSchedPolicyOperatorLog dropLog
= (DropWorkloadSchedPolicyOperatorLog) journal.getData();
env.getWorkloadSchedPolicyMgr().replayDropWorkloadSchedPolicy(dropLog.getId());
break;
}
case OperationType.OP_INIT_EXTERNAL_TABLE: {
// Do nothing.
break;
@ -1678,6 +1695,18 @@ public class EditLog {
logEdit(OperationType.OP_DROP_WORKLOAD_GROUP, operationLog);
}
public void logCreateWorkloadSchedPolicy(WorkloadSchedPolicy workloadSchedPolicy) {
logEdit(OperationType.OP_CREATE_WORKLOAD_SCHED_POLICY, workloadSchedPolicy);
}
public void logAlterWorkloadSchedPolicy(WorkloadSchedPolicy workloadSchedPolicy) {
logEdit(OperationType.OP_ALTER_WORKLOAD_SCHED_POLICY, workloadSchedPolicy);
}
public void dropWorkloadSchedPolicy(long policyId) {
logEdit(OperationType.OP_DROP_WORKLOAD_SCHED_POLICY, new DropWorkloadSchedPolicyOperatorLog(policyId));
}
public void logAlterStoragePolicy(StoragePolicy storagePolicy) {
logEdit(OperationType.OP_ALTER_STORAGE_POLICY, storagePolicy);
}

View File

@ -304,6 +304,9 @@ public class OperationType {
public static final short OP_CREATE_WORKLOAD_GROUP = 410;
public static final short OP_DROP_WORKLOAD_GROUP = 411;
public static final short OP_ALTER_WORKLOAD_GROUP = 412;
public static final short OP_CREATE_WORKLOAD_SCHED_POLICY = 413;
public static final short OP_ALTER_WORKLOAD_SCHED_POLICY = 414;
public static final short OP_DROP_WORKLOAD_SCHED_POLICY = 415;
// query stats 440 ~ 424
public static final short OP_CLEAN_QUERY_STATS = 420;

View File

@ -212,6 +212,13 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveWorkloadGroups", CountingDataOutputStream.class, long.class);
break;
case "workloadSchedPolicy":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadWorkloadSchedPolicy", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveWorkloadSchedPolicy", CountingDataOutputStream.class,
long.class);
break;
case "binlogs":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadBinlogs", DataInputStream.class, long.class);

View File

@ -39,7 +39,7 @@ public class PersistMetaModules {
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager");
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy");
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)
public static final ImmutableList<String> DEPRECATED_MODULE_NAMES = ImmutableList.of(

View File

@ -104,7 +104,7 @@ public class ConnectContext {
protected volatile long backendId;
protected volatile LoadTaskInfo streamLoadInfo;
protected volatile TUniqueId queryId;
protected volatile TUniqueId queryId = null;
protected volatile String traceId;
// id for this connection
protected volatile int connectionId;

View File

@ -170,4 +170,8 @@ public class ConnectScheduler {
TUniqueId queryId = traceId2QueryId.get(traceId);
return queryId == null ? "" : DebugUtil.printId(queryId);
}
public Map<Integer, ConnectContext> getConnectionMap() {
return connectionMap;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterUserStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.AlterWorkloadGroupStmt;
import org.apache.doris.analysis.AlterWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
@ -77,6 +78,7 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateUserStmt;
import org.apache.doris.analysis.CreateViewStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.CreateWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropCatalogStmt;
@ -94,6 +96,7 @@ import org.apache.doris.analysis.DropStatsStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.DropUserStmt;
import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
@ -285,6 +288,12 @@ public class DdlExecutor {
env.getWorkloadGroupMgr().createWorkloadGroup((CreateWorkloadGroupStmt) ddlStmt);
} else if (ddlStmt instanceof DropWorkloadGroupStmt) {
env.getWorkloadGroupMgr().dropWorkloadGroup((DropWorkloadGroupStmt) ddlStmt);
} else if (ddlStmt instanceof CreateWorkloadSchedPolicyStmt) {
env.getWorkloadSchedPolicyMgr().createWorkloadSchedPolicy((CreateWorkloadSchedPolicyStmt) ddlStmt);
} else if (ddlStmt instanceof AlterWorkloadSchedPolicyStmt) {
env.getWorkloadSchedPolicyMgr().alterWorkloadSchedPolicy((AlterWorkloadSchedPolicyStmt) ddlStmt);
} else if (ddlStmt instanceof DropWorkloadSchedPolicyStmt) {
env.getWorkloadSchedPolicyMgr().dropWorkloadSchedPolicy((DropWorkloadSchedPolicyStmt) ddlStmt);
} else if (ddlStmt instanceof CreateDataSyncJobStmt) {
CreateDataSyncJobStmt createSyncJobStmt = (CreateDataSyncJobStmt) ddlStmt;
SyncJobManager syncJobMgr = env.getSyncJobManager();

View File

@ -105,6 +105,7 @@ import org.apache.doris.analysis.ShowUserPropertyStmt;
import org.apache.doris.analysis.ShowVariablesStmt;
import org.apache.doris.analysis.ShowViewStmt;
import org.apache.doris.analysis.ShowWorkloadGroupsStmt;
import org.apache.doris.analysis.ShowWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.backup.AbstractJob;
import org.apache.doris.backup.BackupJob;
@ -348,6 +349,8 @@ public class ShowExecutor {
handleShowResources();
} else if (stmt instanceof ShowWorkloadGroupsStmt) {
handleShowWorkloadGroups();
} else if (stmt instanceof ShowWorkloadSchedPolicyStmt) {
handleShowWorkloadSchedPolicy();
} else if (stmt instanceof ShowExportStmt) {
handleShowExport();
} else if (stmt instanceof ShowBackendsStmt) {
@ -1951,6 +1954,12 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), workloadGroupsInfos);
}
private void handleShowWorkloadSchedPolicy() {
ShowWorkloadSchedPolicyStmt showStmt = (ShowWorkloadSchedPolicyStmt) stmt;
List<List<String>> workloadSchedInfo = Env.getCurrentEnv().getWorkloadSchedPolicyMgr().getShowPolicyInfo();
resultSet = new ShowResultSet(showStmt.getMetaData(), workloadSchedInfo);
}
private void handleShowExport() throws AnalysisException {
ShowExportStmt showExportStmt = (ShowExportStmt) stmt;
Env env = Env.getCurrentEnv();

View File

@ -432,6 +432,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return procNode.fetchResult(currentUserIdentity).getRows();
}
public Long getWorkloadGroupIdByName(String name) {
readLock();
try {
WorkloadGroup wg = nameToWorkloadGroup.get(name);
if (wg == null) {
return null;
}
return wg.getId();
} finally {
readUnlock();
}
}
// for ut
public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
return nameToWorkloadGroup;

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public interface WorkloadAction {
void exec(WorkloadQueryInfo queryInfo);
WorkloadActionType getWorkloadActionType();
// NOTE(wb) currently createPolicyAction is also used when replay meta, it better not contains heavy check
static WorkloadAction createWorkloadAction(WorkloadActionMeta workloadActionMeta)
throws UserException {
if (WorkloadActionType.CANCEL_QUERY.equals(workloadActionMeta.action)) {
return WorkloadActionCancelQuery.createWorkloadAction();
} else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(workloadActionMeta.action)) {
return WorkloadActionMoveQueryToGroup.createWorkloadAction(workloadActionMeta.actionArgs);
} else if (WorkloadActionType.SET_SESSION_VARIABLE.equals(workloadActionMeta.action)) {
return WorkloadActionSetSessionVar.createWorkloadAction(workloadActionMeta.actionArgs);
}
throw new UserException("invalid action type " + workloadActionMeta.action);
}
}

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class WorkloadActionCancelQuery implements WorkloadAction {
private static final Logger LOG = LogManager.getLogger(WorkloadActionCancelQuery.class);
@Override
public void exec(WorkloadQueryInfo queryInfo) {
if (queryInfo.context != null && !queryInfo.context.isKilled()
&& queryInfo.tUniqueId != null
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId);
queryInfo.context.cancelQuery();
}
}
public static WorkloadActionCancelQuery createWorkloadAction() {
return new WorkloadActionCancelQuery();
}
@Override
public WorkloadActionType getWorkloadActionType() {
return WorkloadActionType.CANCEL_QUERY;
}
}

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
import com.google.gson.annotations.SerializedName;
public class WorkloadActionMeta {
@SerializedName(value = "action")
public WorkloadActionType action;
@SerializedName(value = "actionArgs")
public String actionArgs;
public WorkloadActionMeta(String workloadAction, String actionArgs) throws UserException {
this.action = getWorkloadActionType(workloadAction);
this.actionArgs = actionArgs;
}
static WorkloadActionType getWorkloadActionType(String strType) throws UserException {
if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.CANCEL_QUERY;
} else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.MOVE_QUERY_TO_GROUP;
} else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.SET_SESSION_VARIABLE;
}
throw new UserException("invalid action type " + strType);
}
}

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.publish.WorkloadActionPublishThread;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TWorkloadMoveQueryToGroupAction;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class WorkloadActionMoveQueryToGroup implements WorkloadAction {
private static final Logger LOG = LogManager.getLogger(WorkloadActionMoveQueryToGroup.class);
private long dstWgId;
public WorkloadActionMoveQueryToGroup(long dstWgId) {
this.dstWgId = dstWgId;
}
@Override
public void exec(WorkloadQueryInfo queryInfo) {
if (queryInfo.context != null && !queryInfo.context.isKilled()
&& queryInfo.tUniqueId != null
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("try move query {} to group {}", queryInfo.queryId, dstWgId);
TWorkloadMoveQueryToGroupAction moveQueryToGroupAction = new TWorkloadMoveQueryToGroupAction();
moveQueryToGroupAction.setQueryId(queryInfo.tUniqueId);
moveQueryToGroupAction.setWorkloadGroupId(dstWgId);
TopicInfo topicInfo = new TopicInfo();
topicInfo.setMoveAction(moveQueryToGroupAction);
WorkloadActionPublishThread.putWorkloadAction(TTopicInfoType.MOVE_QUERY_TO_GROUP, topicInfo);
}
}
@Override
public WorkloadActionType getWorkloadActionType() {
return WorkloadActionType.MOVE_QUERY_TO_GROUP;
}
public static WorkloadActionMoveQueryToGroup createWorkloadAction(String groupId) {
long wgId = Long.parseLong(groupId);
return new WorkloadActionMoveQueryToGroup(wgId);
}
}

View File

@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.SetExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
public class WorkloadActionSetSessionVar implements WorkloadAction {
private static final Logger LOG = LogManager.getLogger(WorkloadActionSetSessionVar.class);
private String varName;
private String varValue;
public WorkloadActionSetSessionVar(String varName, String varValue) {
this.varName = varName;
this.varValue = varValue;
}
@Override
public void exec(WorkloadQueryInfo queryInfo) {
try {
List<SetVar> list = new ArrayList<>();
SetVar sv = new SetVar(varName, new StringLiteral(varValue));
list.add(sv);
SetStmt setStmt = new SetStmt(list);
SetExecutor executor = new SetExecutor(queryInfo.context, setStmt);
executor.execute();
} catch (Throwable t) {
LOG.error("error happens when exec {}", WorkloadActionType.SET_SESSION_VARIABLE, t);
}
}
@Override
public WorkloadActionType getWorkloadActionType() {
return WorkloadActionType.SET_SESSION_VARIABLE;
}
public String getVarName() {
return varName;
}
public static WorkloadAction createWorkloadAction(String actionCmdArgs) throws UserException {
String[] strs = actionCmdArgs.split("=");
if (strs.length != 2 || StringUtils.isEmpty(strs[0].trim()) || StringUtils.isEmpty(strs[1].trim())) {
throw new UserException("illegal arguments, it should be like set_session_variable \"xxx=xxx\"");
}
return new WorkloadActionSetSessionVar(strs[0].trim(), strs[1].trim());
}
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadActionType {
CANCEL_QUERY, // cancel query
MOVE_QUERY_TO_GROUP, // move query from one wg group to another
SET_SESSION_VARIABLE
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public interface WorkloadCondition {
boolean eval(String strValue);
WorkloadMetricType getMetricType();
// NOTE(wb) currently createPolicyCondition is also used when replay meta, it better not contains heavy check
static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm)
throws UserException {
if (WorkloadMetricType.USERNAME.equals(cm.metricName)) {
return WorkloadConditionUsername.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) {
return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, cm.value);
}
throw new UserException("invalid metric name:" + cm.metricName);
}
}

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
import org.apache.commons.lang3.StringUtils;
public class WorkloadConditionCompareUtils {
static WorkloadConditionOperator getOperator(String op) throws UserException {
if ("=".equals(op)) {
return WorkloadConditionOperator.EQUAL;
} else if (">".equals(op)) {
return WorkloadConditionOperator.GREATER;
} else if (">=".equals(op)) {
return WorkloadConditionOperator.GREATER_EQUAL;
} else if ("<".equals(op)) {
return WorkloadConditionOperator.LESS;
} else if ("<=".equals(op)) {
return WorkloadConditionOperator.LESS_EQUAl;
} else {
throw new UserException("unexpected compare operator " + op);
}
}
static boolean compareInteger(WorkloadConditionOperator operator, long firstArgs, long secondArgs) {
switch (operator) {
case EQUAL:
return firstArgs == secondArgs;
case GREATER:
return firstArgs > secondArgs;
case GREATER_EQUAL:
return firstArgs >= secondArgs;
case LESS:
return firstArgs < secondArgs;
case LESS_EQUAl:
return firstArgs <= secondArgs;
default:
throw new RuntimeException("unexpected integer operator " + operator);
}
}
static boolean compareDouble(WorkloadConditionOperator operator, double firstArgs, double secondArgs) {
switch (operator) {
case EQUAL:
return firstArgs == secondArgs;
case GREATER:
return firstArgs > secondArgs;
case GREATER_EQUAL:
return firstArgs >= secondArgs;
case LESS:
return firstArgs < secondArgs;
case LESS_EQUAl:
return firstArgs <= secondArgs;
default:
throw new RuntimeException("unexpected compare double operator " + operator);
}
}
static boolean compareString(WorkloadConditionOperator operator, String firstArgs, String secondArgs) {
switch (operator) {
case EQUAL:
return StringUtils.equals(firstArgs, secondArgs);
default:
throw new RuntimeException("unexpected compare string operator " + operator);
}
}
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
import com.google.gson.annotations.SerializedName;
public class WorkloadConditionMeta {
@SerializedName(value = "metricName")
public WorkloadMetricType metricName;
@SerializedName(value = "op")
public WorkloadConditionOperator op;
@SerializedName(value = "value")
public String value;
public WorkloadConditionMeta(String metricName, String op, String value) throws UserException {
this.metricName = getMetricType(metricName);
this.op = WorkloadConditionCompareUtils.getOperator(op);
this.value = value;
}
private static WorkloadMetricType getMetricType(String metricStr) throws UserException {
if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.USERNAME;
} else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.QUERY_TIME;
}
throw new UserException("invalid metric name:" + metricStr);
}
public String toString() {
return metricName + " " + op + " " + value;
}
}

View File

@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadConditionOperator {
EQUAL, GREATER, GREATER_EQUAL, LESS, LESS_EQUAl
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public class WorkloadConditionQueryTime implements WorkloadCondition {
private long value;
private WorkloadConditionOperator op;
public WorkloadConditionQueryTime(WorkloadConditionOperator op, long value) {
this.op = op;
this.value = value;
}
@Override
public boolean eval(String strValue) {
long inputLongValue = Long.parseLong(strValue);
return WorkloadConditionCompareUtils.compareInteger(op, inputLongValue, value);
}
public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid query time value, " + longValue + ", it requires >= 0");
}
return new WorkloadConditionQueryTime(op, longValue);
}
@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.QUERY_TIME;
}
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.hbase.thirdparty.com.google.gson.annotations.SerializedName;
public class WorkloadConditionUsername implements WorkloadCondition {
@SerializedName(value = "op")
private WorkloadConditionOperator op;
@SerializedName(value = "value")
private String value;
public WorkloadConditionUsername(WorkloadConditionOperator op, String value) {
this.op = op;
this.value = value;
}
@Override
public boolean eval(String inputStrValue) {
return WorkloadConditionCompareUtils.compareString(op, inputStrValue, value);
}
@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.USERNAME;
}
public static WorkloadConditionUsername createWorkloadCondition(WorkloadConditionOperator op, String value) {
// todo(wb) check whether input username is valid
return new WorkloadConditionUsername(op, value);
}
}

View File

@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadMetricType {
USERNAME, QUERY_TIME
}

View File

@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TUniqueId;
import java.util.Map;
public class WorkloadQueryInfo {
String queryId = null;
TUniqueId tUniqueId = null;
ConnectContext context = null;
Map<WorkloadMetricType, String> metricMap;
}

View File

@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import com.esotericsoftware.minlog.Log;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
public static final String ENABLED = "enabled";
public static final String PRIORITY = "priority";
public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).build();
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
String name;
@SerializedName(value = "version")
int version;
@SerializedName(value = "enabled")
private volatile boolean enabled;
@SerializedName(value = "priority")
private volatile int priority;
@SerializedName(value = "conditionMetaList")
List<WorkloadConditionMeta> conditionMetaList;
// we regard action as a command, map's key is command, map's value is args, so it's a command list
@SerializedName(value = "actionMetaList")
List<WorkloadActionMeta> actionMetaList;
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;
public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String> properties) throws UserException {
this.id = id;
this.name = name;
this.workloadConditionList = workloadConditionList;
this.workloadActionList = workloadActionList;
// set enable and priority
parseAndSetProperties(properties);
this.version = 0;
}
// return true, this means all conditions in policy can match queryInfo's data
// return false,
// 1 metric not match
// 2 condition value not match query info's value
boolean isMatch(WorkloadQueryInfo queryInfo) {
for (WorkloadCondition condition : workloadConditionList) {
WorkloadMetricType metricType = condition.getMetricType();
String value = queryInfo.metricMap.get(metricType);
if (value == null) {
return false; // query info's metric must match all condition's metric
}
if (!condition.eval(value)) {
return false;
}
}
return true;
}
public boolean isEnabled() {
return enabled;
}
public int getPriority() {
return priority;
}
public void execAction(WorkloadQueryInfo queryInfo) {
for (WorkloadAction action : workloadActionList) {
action.exec(queryInfo);
}
}
// move > log, cancel > log
// move and cancel can not exist at same time
public WorkloadActionType getFirstActionType() {
WorkloadActionType retType = null;
for (WorkloadAction action : workloadActionList) {
WorkloadActionType currentActionType = action.getWorkloadActionType();
if (retType == null) {
retType = currentActionType;
continue;
}
if (currentActionType == WorkloadActionType.MOVE_QUERY_TO_GROUP
|| currentActionType == WorkloadActionType.CANCEL_QUERY) {
return currentActionType;
}
}
return retType;
}
public void parseAndSetProperties(Map<String, String> properties) throws UserException {
String enabledStr = properties.get(ENABLED);
this.enabled = enabledStr == null ? true : Boolean.parseBoolean(enabledStr);
String priorityStr = properties.get(PRIORITY);
this.priority = priorityStr == null ? 0 : Integer.parseInt(priorityStr);
}
void incrementVersion() {
this.version++;
}
public void setConditionMeta(List<WorkloadConditionMeta> conditionMeta) {
this.conditionMetaList = conditionMeta;
}
public void setActionMeta(List<WorkloadActionMeta> actionMeta) {
this.actionMetaList = actionMeta;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
public long getVersion() {
return version;
}
public List<WorkloadConditionMeta> getConditionMetaList() {
return conditionMetaList;
}
public List<WorkloadActionMeta> getActionMetaList() {
return actionMetaList;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static WorkloadSchedPolicy read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicy.class);
}
@Override
public void gsonPostProcess() throws IOException {
List<WorkloadCondition> policyConditionList = new ArrayList<>();
for (WorkloadConditionMeta cm : conditionMetaList) {
try {
WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
} catch (UserException ue) {
Log.error("unexpected condition data error when replay log ", ue);
}
}
this.workloadConditionList = policyConditionList;
List<WorkloadAction> actionList = new ArrayList<>();
for (WorkloadActionMeta actionMeta : actionMetaList) {
try {
WorkloadAction ret = WorkloadAction.createWorkloadAction(actionMeta);
actionList.add(ret);
} catch (UserException ue) {
Log.error("unexpected action data error when replay log ", ue);
}
}
this.workloadActionList = actionList;
}
}

View File

@ -0,0 +1,535 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.analysis.AlterWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.CreateWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(WorkloadSchedPolicyMgr.class);
@SerializedName(value = "idToPolicy")
private Map<Long, WorkloadSchedPolicy> idToPolicy = Maps.newConcurrentMap();
private Map<String, WorkloadSchedPolicy> nameToPolicy = Maps.newHashMap();
private PolicyProcNode policyProcNode = new PolicyProcNode();
public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("ItemName").add("ItemValue")
.build();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static Comparator<WorkloadSchedPolicy> policyComparator = new Comparator<WorkloadSchedPolicy>() {
@Override
public int compare(WorkloadSchedPolicy p1, WorkloadSchedPolicy p2) {
return p2.getPriority() - p1.getPriority();
}
};
private Thread policyExecThread = new Thread() {
@Override
public void run() {
while (true) {
try {
// todo(wb) add more query info source, not only comes from connectionmap
// 1 get query info map
Map<Integer, ConnectContext> connectMap = ExecuteEnv.getInstance().getScheduler()
.getConnectionMap();
List<WorkloadQueryInfo> queryInfoList = new ArrayList<>();
// a snapshot for connect context
Set<Integer> keySet = new HashSet<>();
keySet.addAll(connectMap.keySet());
for (Integer connectId : keySet) {
ConnectContext cctx = connectMap.get(connectId);
if (cctx == null || cctx.isKilled()) {
continue;
}
String username = cctx.getQualifiedUser();
long queryTime = System.currentTimeMillis() - cctx.getStartTime();
WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId());
policyQueryInfo.tUniqueId = cctx.queryId();
policyQueryInfo.context = cctx;
policyQueryInfo.metricMap = new HashMap<>();
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
policyQueryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, String.valueOf(queryTime));
queryInfoList.add(policyQueryInfo);
}
// 2 exec policy
if (queryInfoList.size() > 0) {
execPolicy(queryInfoList);
}
} catch (Throwable t) {
LOG.error("[policy thread]error happens when exec policy");
}
// 3 sleep
try {
Thread.sleep(Config.workload_sched_policy_interval_ms);
} catch (InterruptedException e) {
LOG.error("error happends when policy exec thread sleep");
}
}
}
};
public void start() {
policyExecThread.setName("workload-auto-scheduler-thread");
policyExecThread.start();
}
public void createWorkloadSchedPolicy(CreateWorkloadSchedPolicyStmt createStmt) throws UserException {
String policyName = createStmt.getPolicyName();
// 1 create condition
List<WorkloadConditionMeta> originConditions = createStmt.getConditions();
List<WorkloadCondition> policyConditionList = new ArrayList<>();
for (WorkloadConditionMeta cm : originConditions) {
WorkloadCondition cond = WorkloadCondition.createWorkloadCondition(cm);
policyConditionList.add(cond);
}
// 2 create action
List<WorkloadActionMeta> originActions = createStmt.getActions();
List<WorkloadAction> policyActionList = new ArrayList<>();
for (WorkloadActionMeta workloadActionMeta : originActions) {
WorkloadActionType actionName = workloadActionMeta.action;
String actionArgs = workloadActionMeta.actionArgs;
// we need convert wgName to wgId, because wgName may change
if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(actionName)) {
Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(actionArgs);
if (wgId == null) {
throw new UserException(
"can not find workload group " + actionArgs + " when set workload sched policy");
}
workloadActionMeta.actionArgs = wgId.toString();
}
WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta);
policyActionList.add(ret);
}
checkPolicyActionConflicts(policyActionList);
// 3 create policy
Map<String, String> propMap = createStmt.getProperties();
if (propMap == null) {
propMap = new HashMap<>();
}
if (propMap.size() != 0) {
checkProperties(propMap);
}
writeLock();
try {
if (nameToPolicy.containsKey(createStmt.getPolicyName())) {
if (createStmt.isIfNotExists()) {
return;
} else {
throw new UserException("workload schedule policy " + policyName + " already exists ");
}
}
long id = Env.getCurrentEnv().getNextId();
WorkloadSchedPolicy policy = new WorkloadSchedPolicy(id, policyName,
policyConditionList, policyActionList, propMap);
policy.setConditionMeta(originConditions);
policy.setActionMeta(originActions);
Env.getCurrentEnv().getEditLog().logCreateWorkloadSchedPolicy(policy);
idToPolicy.put(id, policy);
nameToPolicy.put(policyName, policy);
} finally {
writeUnlock();
}
}
private void checkPolicyActionConflicts(List<WorkloadAction> actionList) throws UserException {
Set<WorkloadActionType> actionTypeSet = new HashSet<>();
Set<String> setSessionVarSet = new HashSet<>();
for (WorkloadAction action : actionList) {
// set session var cmd can be duplicate, but args can not be duplicate
if (action.getWorkloadActionType().equals(WorkloadActionType.SET_SESSION_VARIABLE)) {
WorkloadActionSetSessionVar setAction = (WorkloadActionSetSessionVar) action;
if (!setSessionVarSet.add(setAction.getVarName())) {
throw new UserException(
"duplicate set_session_variable action args one policy, " + setAction.getVarName());
}
} else if (!actionTypeSet.add(action.getWorkloadActionType())) {
throw new UserException("duplicate action in one policy");
}
}
if (actionTypeSet.contains(WorkloadActionType.CANCEL_QUERY) && actionTypeSet.contains(
WorkloadActionType.MOVE_QUERY_TO_GROUP)) {
throw new UserException(String.format("%s and %s can not exist in one policy at same time",
WorkloadActionType.CANCEL_QUERY, WorkloadActionType.MOVE_QUERY_TO_GROUP));
}
}
public void execPolicy(List<WorkloadQueryInfo> queryInfoList) {
// 1 get a snapshot of policy
Set<Long> policyIdSet = new HashSet<>();
readLock();
try {
policyIdSet.addAll(idToPolicy.keySet());
} finally {
readUnlock();
}
for (WorkloadQueryInfo queryInfo : queryInfoList) {
try {
// 1 check policy is match
Map<WorkloadActionType, Queue<WorkloadSchedPolicy>> matchedPolicyMap = Maps.newHashMap();
for (Long policyId : policyIdSet) {
WorkloadSchedPolicy policy = idToPolicy.get(policyId);
if (policy == null) {
continue;
}
if (policy.isEnabled() && policy.isMatch(queryInfo)) {
WorkloadActionType actionType = policy.getFirstActionType();
// add to priority queue
Queue<WorkloadSchedPolicy> queue = matchedPolicyMap.get(actionType);
if (queue == null) {
queue = new PriorityQueue<>(policyComparator);
matchedPolicyMap.put(actionType, queue);
}
queue.offer(policy);
}
}
if (matchedPolicyMap.size() == 0) {
continue;
}
// 2 pick higher priority policy when action conflicts
List<WorkloadSchedPolicy> pickedPolicyList = pickPolicy(matchedPolicyMap);
// 3 exec action
for (WorkloadSchedPolicy policy : pickedPolicyList) {
policy.execAction(queryInfo);
}
} catch (Throwable e) {
LOG.warn("exec policy with query {} failed ", queryInfo.queryId, e);
}
}
}
List<WorkloadSchedPolicy> pickPolicy(Map<WorkloadActionType, Queue<WorkloadSchedPolicy>> policyMap) {
// NOTE(wb) currently all action share the same comparator which use priority.
// But later we may design every action type's own comparator,
// such as if two move group action has the same priority but move to different group,
// then we may pick group by resource usage and query statistics.
// 1 only need one policy with move action which has the highest priority
WorkloadSchedPolicy policyWithMoveAction = null;
Queue<WorkloadSchedPolicy> moveActionQueue = policyMap.get(WorkloadActionType.MOVE_QUERY_TO_GROUP);
if (moveActionQueue != null) {
policyWithMoveAction = moveActionQueue.peek();
}
// 2 only need one policy with cancel action which has the highest priority
WorkloadSchedPolicy policyWithCancelQueryAction = null;
Queue<WorkloadSchedPolicy> canelQueryActionQueue = policyMap.get(WorkloadActionType.CANCEL_QUERY);
if (canelQueryActionQueue != null) {
policyWithCancelQueryAction = canelQueryActionQueue.peek();
}
// 3 compare policy with move action and cancel action
List<WorkloadSchedPolicy> ret = new ArrayList<>();
if (policyWithMoveAction != null && policyWithCancelQueryAction != null) {
if (policyWithMoveAction.getPriority() > policyWithCancelQueryAction.getPriority()) {
ret.add(policyWithMoveAction);
} else {
ret.add(policyWithCancelQueryAction);
}
} else {
if (policyWithCancelQueryAction != null) {
ret.add(policyWithCancelQueryAction);
} else if (policyWithMoveAction != null) {
ret.add(policyWithMoveAction);
}
}
// 4 add no-conflict policy
for (Map.Entry<WorkloadActionType, Queue<WorkloadSchedPolicy>> entry : policyMap.entrySet()) {
WorkloadActionType type = entry.getKey();
Queue<WorkloadSchedPolicy> policyQueue = entry.getValue();
if (!WorkloadActionType.CANCEL_QUERY.equals(type) && !WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(type)
&& policyQueue != null && policyQueue.size() > 0) {
WorkloadSchedPolicy pickedPolicy = policyQueue.peek();
ret.add(pickedPolicy);
}
}
Preconditions.checkArgument(ret.size() > 0, "should pick at least one policy");
return ret;
}
private void checkProperties(Map<String, String> properties) throws UserException {
Set<String> allInputPropKeySet = new HashSet<>();
allInputPropKeySet.addAll(properties.keySet());
allInputPropKeySet.removeAll(WorkloadSchedPolicy.POLICY_PROPERTIES);
if (allInputPropKeySet.size() > 0) {
throw new UserException("illegal policy properties " + String.join(",", allInputPropKeySet));
}
String enabledStr = properties.get(WorkloadSchedPolicy.ENABLED);
if (enabledStr != null) {
if (!"true".equals(enabledStr) && !"false".equals(enabledStr)) {
throw new UserException("invalid enabled property value, it can only true or false with lower case");
}
}
String priorityStr = properties.get(WorkloadSchedPolicy.PRIORITY);
if (priorityStr != null) {
try {
Long prioLongVal = Long.parseLong(priorityStr);
if (prioLongVal < 0 || prioLongVal > 100) {
throw new UserException("policy's priority can only between 0 ~ 100");
}
} catch (NumberFormatException e) {
throw new UserException("policy's priority must be a number, input value=" + priorityStr);
}
}
}
public void alterWorkloadSchedPolicy(AlterWorkloadSchedPolicyStmt alterStmt) throws UserException {
writeLock();
try {
String policyName = alterStmt.getPolicyName();
WorkloadSchedPolicy policy = nameToPolicy.get(policyName);
if (policy == null) {
throw new UserException("can not find workload schedule policy " + policyName);
}
Map<String, String> properties = alterStmt.getProperties();
checkProperties(properties);
policy.parseAndSetProperties(properties);
policy.incrementVersion();
Env.getCurrentEnv().getEditLog().logAlterWorkloadSchedPolicy(policy);
} finally {
writeUnlock();
}
}
public void dropWorkloadSchedPolicy(DropWorkloadSchedPolicyStmt dropStmt) throws UserException {
writeLock();
try {
String policyName = dropStmt.getPolicyName();
WorkloadSchedPolicy schedPolicy = nameToPolicy.get(policyName);
if (schedPolicy == null) {
if (dropStmt.isIfExists()) {
return;
} else {
throw new UserException("workload schedule policy " + policyName + " not exists");
}
}
long id = schedPolicy.getId();
idToPolicy.remove(id);
nameToPolicy.remove(policyName);
Env.getCurrentEnv().getEditLog().dropWorkloadSchedPolicy(id);
} finally {
writeUnlock();
}
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}
public void replayAlterWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}
public void replayDropWorkloadSchedPolicy(long policyId) {
writeLock();
try {
WorkloadSchedPolicy policy = idToPolicy.get(policyId);
if (policy == null) {
return;
}
idToPolicy.remove(policyId);
nameToPolicy.remove(policy.getName());
} finally {
writeUnlock();
}
}
private void insertWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
writeLock();
try {
idToPolicy.put(policy.getId(), policy);
nameToPolicy.put(policy.getName(), policy);
} finally {
writeUnlock();
}
}
public List<List<String>> getShowPolicyInfo() {
UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}
public class PolicyProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
result.setNames(WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES);
readLock();
try {
for (WorkloadSchedPolicy policy : idToPolicy.values()) {
if (!Env.getCurrentEnv().getAccessManager().checkWorkloadGroupPriv(currentUserIdentity,
policy.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) {
continue;
}
String pId = String.valueOf(policy.getId());
String pName = policy.getName();
List<WorkloadConditionMeta> conditionList = policy.getConditionMetaList();
for (WorkloadConditionMeta cm : conditionList) {
List<String> condRow = new ArrayList<>();
condRow.add(pId);
condRow.add(pName);
condRow.add("condition");
condRow.add(cm.toString());
result.addRow(condRow);
}
List<WorkloadActionMeta> actionList = policy.getActionMetaList();
for (WorkloadActionMeta workloadActionMeta : actionList) {
List<String> actionRow = new ArrayList<>();
actionRow.add(pId);
actionRow.add(pName);
actionRow.add("action");
if (StringUtils.isEmpty(workloadActionMeta.actionArgs)) {
actionRow.add(workloadActionMeta.action.toString());
} else {
actionRow.add(workloadActionMeta.action + " " + workloadActionMeta.actionArgs);
}
result.addRow(actionRow);
}
List<String> prioRow = new ArrayList<>();
prioRow.add(pId);
prioRow.add(pName);
prioRow.add("priority");
prioRow.add(String.valueOf(policy.getPriority()));
result.addRow(prioRow);
List<String> enabledRow = new ArrayList<>();
enabledRow.add(pId);
enabledRow.add(pName);
enabledRow.add("enabled");
enabledRow.add(String.valueOf(policy.isEnabled()));
result.addRow(enabledRow);
List<String> versionRow = new ArrayList<>();
versionRow.add(pId);
versionRow.add(pName);
versionRow.add("version");
versionRow.add(String.valueOf(policy.getVersion()));
result.addRow(versionRow);
}
} finally {
readUnlock();
}
return result;
}
}
public static WorkloadSchedPolicyMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, WorkloadSchedPolicyMgr.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
@Override
public void gsonPostProcess() throws IOException {
idToPolicy.forEach(
(id, schedPolicy) -> nameToPolicy.put(schedPolicy.getName(), schedPolicy));
}
}