[Improment](executor)Add scanbytes/scanrows condition (#31364)

* Add scanbytes/scanrows condition

* fix reg
This commit is contained in:
wangbo
2024-02-26 19:01:35 +08:00
committed by yiguolei
parent b8fe620ba3
commit 1127b0065a
22 changed files with 161 additions and 265 deletions

View File

@ -111,7 +111,6 @@ import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadActionPublishThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DynamicPartitionUtil;
@ -522,8 +521,6 @@ public class Env {
private TopicPublisherThread topicPublisherThread;
private WorkloadActionPublishThread workloadActionPublisherThread;
private MTMVService mtmvService;
private InsertOverwriteManager insertOverwriteManager;
@ -761,8 +758,6 @@ public class Env {
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
this.topicPublisherThread = new TopicPublisherThread(
"TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo);
this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher",
Config.workload_action_interval_ms, systemInfo);
this.mtmvService = new MTMVService();
this.insertOverwriteManager = new InsertOverwriteManager();
}
@ -1042,7 +1037,6 @@ public class Env {
workloadGroupMgr.startUpdateThread();
workloadSchedPolicyMgr.start();
workloadActionPublisherThread.start();
workloadRuntimeStatusMgr.start();
}

View File

@ -1,123 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class WorkloadActionPublishThread extends Daemon {
private ExecutorService executor = ThreadPoolManager
.newDaemonFixedThreadPool(4, 256, "workload-action-publish-thread", true);
private static final Logger LOG = LogManager.getLogger(WorkloadActionPublishThread.class);
public static Map<TTopicInfoType, List<TopicInfo>> workloadActionToplicInfoMap
= new HashMap<TTopicInfoType, List<TopicInfo>>();
public static synchronized void putWorkloadAction(TTopicInfoType type, TopicInfo topicInfo) {
List<TopicInfo> list = workloadActionToplicInfoMap.get(type);
if (list == null) {
list = new ArrayList<TopicInfo>();
workloadActionToplicInfoMap.put(type, list);
}
list.add(topicInfo);
}
public static synchronized Map<TTopicInfoType, List<TopicInfo>> getCurrentWorkloadActionMap() {
Map<TTopicInfoType, List<TopicInfo>> retMap = workloadActionToplicInfoMap;
workloadActionToplicInfoMap = new HashMap<TTopicInfoType, List<TopicInfo>>();
return retMap;
}
private SystemInfoService clusterInfoService;
public WorkloadActionPublishThread(String name, long intervalMs,
SystemInfoService clusterInfoService) {
super(name, intervalMs);
this.clusterInfoService = clusterInfoService;
}
@Override
protected final void runOneCycle() {
Map<TTopicInfoType, List<TopicInfo>> actionMap
= WorkloadActionPublishThread.getCurrentWorkloadActionMap();
if (actionMap.size() == 0) {
LOG.info("no workload action found, skip publish");
return;
}
Collection<Backend> currentBeToPublish = clusterInfoService.getIdToBackend().values();
AckResponseHandler handler = new AckResponseHandler(currentBeToPublish);
TPublishTopicRequest request = new TPublishTopicRequest();
request.setTopicMap(actionMap);
for (Backend be : currentBeToPublish) {
executor.submit(new WorkloadMoveActionTask(request, be, handler));
}
}
public class WorkloadMoveActionTask implements Runnable {
private TPublishTopicRequest request;
private Backend be;
private ResponseHandler handler;
public WorkloadMoveActionTask(TPublishTopicRequest request, Backend be,
ResponseHandler handler) {
this.request = request;
this.be = be;
this.handler = handler;
}
@Override
public void run() {
long beginTime = System.currentTimeMillis();
try {
TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort());
BackendService.Client client = ClientPool.backendPool.borrowObject(addr);
client.publishTopicInfo(request);
LOG.info("publish move action topic to be {} success, time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime));
} catch (Exception e) {
LOG.warn("publish move action topic to be {} error happens: , time cost={} ms",
be.getHost(), (System.currentTimeMillis() - beginTime), e);
} finally {
handler.onResponse(be);
}
}
}
}

View File

@ -30,8 +30,6 @@ public interface WorkloadAction {
throws UserException {
if (WorkloadActionType.CANCEL_QUERY.equals(workloadActionMeta.action)) {
return WorkloadActionCancelQuery.createWorkloadAction();
} else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(workloadActionMeta.action)) {
return WorkloadActionMoveQueryToGroup.createWorkloadAction(workloadActionMeta.actionArgs);
} else if (WorkloadActionType.SET_SESSION_VARIABLE.equals(workloadActionMeta.action)) {
return WorkloadActionSetSessionVar.createWorkloadAction(workloadActionMeta.actionArgs);
}

View File

@ -1,67 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.publish.WorkloadActionPublishThread;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TWorkloadMoveQueryToGroupAction;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class WorkloadActionMoveQueryToGroup implements WorkloadAction {
private static final Logger LOG = LogManager.getLogger(WorkloadActionMoveQueryToGroup.class);
private long dstWgId;
public WorkloadActionMoveQueryToGroup(long dstWgId) {
this.dstWgId = dstWgId;
}
@Override
public void exec(WorkloadQueryInfo queryInfo) {
if (queryInfo.context != null && !queryInfo.context.isKilled()
&& queryInfo.tUniqueId != null
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("try move query {} to group {}", queryInfo.queryId, dstWgId);
TWorkloadMoveQueryToGroupAction moveQueryToGroupAction = new TWorkloadMoveQueryToGroupAction();
moveQueryToGroupAction.setQueryId(queryInfo.tUniqueId);
moveQueryToGroupAction.setWorkloadGroupId(dstWgId);
TopicInfo topicInfo = new TopicInfo();
topicInfo.setMoveAction(moveQueryToGroupAction);
WorkloadActionPublishThread.putWorkloadAction(TTopicInfoType.MOVE_QUERY_TO_GROUP, topicInfo);
}
}
@Override
public WorkloadActionType getWorkloadActionType() {
return WorkloadActionType.MOVE_QUERY_TO_GROUP;
}
public static WorkloadActionMoveQueryToGroup createWorkloadAction(String groupId) {
long wgId = Long.parseLong(groupId);
return new WorkloadActionMoveQueryToGroup(wgId);
}
}

View File

@ -33,6 +33,10 @@ public interface WorkloadCondition {
return WorkloadConditionUsername.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) {
return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.BE_SCAN_ROWS.equals(cm.metricName)) {
return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value);
}
throw new UserException("invalid metric name:" + cm.metricName);
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public class WorkloadConditionBeScanBytes implements WorkloadCondition {
private long value;
private WorkloadConditionOperator op;
public WorkloadConditionBeScanBytes(WorkloadConditionOperator op, long value) {
this.op = op;
this.value = value;
}
@Override
public boolean eval(String strValue) {
// currently not support run in fe, so this condition never match
return false;
}
public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0");
}
return new WorkloadConditionBeScanBytes(op, longValue);
}
@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.BE_SCAN_BYTES;
}
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public class WorkloadConditionBeScanRows implements WorkloadCondition {
private long value;
private WorkloadConditionOperator op;
public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long value) {
this.op = op;
this.value = value;
}
@Override
public boolean eval(String strValue) {
// currently not support run in fe, so this condition never match
return false;
}
public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0");
}
return new WorkloadConditionBeScanRows(op, longValue);
}
@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.BE_SCAN_ROWS;
}
}

View File

@ -44,6 +44,10 @@ public class WorkloadConditionMeta {
return WorkloadMetricType.USERNAME;
} else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.QUERY_TIME;
} else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_ROWS;
} else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_BYTES;
}
throw new UserException("invalid metric name:" + metricStr);
}

View File

@ -18,5 +18,5 @@
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadMetricType {
USERNAME, QUERY_TIME, SCAN_ROWS, SCAN_BYTES
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
}

View File

@ -52,7 +52,9 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
// used for convert fe type to thrift type
private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME).build();
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build();
private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)

View File

@ -86,9 +86,8 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
.add(WorkloadActionType.CANCEL_QUERY).build();
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.SCAN_ROWS)
.add(WorkloadMetricType.SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
.build();
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -122,15 +121,12 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
}
String username = cctx.getQualifiedUser();
long queryTime = System.currentTimeMillis() - cctx.getStartTime();
WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo();
policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId());
policyQueryInfo.tUniqueId = cctx.queryId();
policyQueryInfo.context = cctx;
policyQueryInfo.metricMap = new HashMap<>();
policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username);
policyQueryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, String.valueOf(queryTime));
queryInfoList.add(policyQueryInfo);
}
@ -174,19 +170,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {
List<WorkloadActionMeta> originActions = createStmt.getActions();
List<WorkloadAction> policyActionList = new ArrayList<>();
for (WorkloadActionMeta workloadActionMeta : originActions) {
WorkloadActionType actionName = workloadActionMeta.action;
String actionArgs = workloadActionMeta.actionArgs;
// we need convert wgName to wgId, because wgName may change
if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(actionName)) {
Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(actionArgs);
if (wgId == null) {
throw new UserException(
"can not find workload group " + actionArgs + " when set workload sched policy");
}
workloadActionMeta.actionArgs = wgId.toString();
}
// todo(wb) support move action
WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta);
policyActionList.add(ret);
}