diff --git a/be/src/agent/workload_sched_policy_listener.cpp b/be/src/agent/workload_sched_policy_listener.cpp index 461fd2cbb0..689a600367 100644 --- a/be/src/agent/workload_sched_policy_listener.cpp +++ b/be/src/agent/workload_sched_policy_listener.cpp @@ -69,9 +69,7 @@ void WorkloadschedPolicyListener::handle_topic_info(const std::vector policy_map.emplace(tpolicy.id, std::move(policy_ptr)); } size_t new_policy_size = policy_map.size(); - if (new_policy_size > 0) { - _exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map)); - } + _exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map)); LOG(INFO) << "[workload_schedule]finish update workload schedule policy, size=" << new_policy_size; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f698553885..e52b71d277 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1506,17 +1506,10 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { std::lock_guard lock(_lock); - // todo: use monotonic time - VecDateTimeValue now = VecDateTimeValue::local_time(); for (const auto& q : _query_ctx_map) { WorkloadQueryInfo workload_query_info; workload_query_info.query_id = print_id(q.first); workload_query_info.tquery_id = q.first; - - uint64_t query_time_millisecond = q.second->query_time(now) * 1000; - workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME, - std::to_string(query_time_millisecond)); - // todo, add scan rows, scan bytes query_info_list->push_back(workload_query_info); } } diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index a658e527f6..c6f70643f1 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -172,6 +172,25 @@ std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_sta return qs_ptr; } +void RuntimeQueryStatiticsMgr::get_metric_map( + std::string query_id, std::map& metric_map) { + QueryStatistics ret_qs; + int64_t query_time_ms = 0; + { + std::shared_lock read_lock(_qs_ctx_map_lock); + if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { + for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) { + ret_qs.merge(*qs); + } + query_time_ms = + MonotonicMillis() - _query_statistics_ctx_map.at(query_id)->_query_start_time; + } + } + metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms)); + metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows())); + metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes())); +} + void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { // wg id just need eventual consistency, read lock is ok std::shared_lock read_lock(_qs_ctx_map_lock); diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 98d4f55472..69b283b6d1 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -21,6 +21,8 @@ #include #include "runtime/query_statistics.h" +#include "runtime/workload_management/workload_condition.h" +#include "util/time.h" namespace doris { @@ -29,6 +31,7 @@ public: QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { this->_is_query_finished = false; this->_wg_id = -1; + this->_query_start_time = MonotonicMillis(); } ~QueryStatisticsCtx() = default; @@ -40,6 +43,7 @@ public: TNetworkAddress _fe_addr; int64_t _query_finish_time; int64_t _wg_id; + int64_t _query_start_time; }; class RuntimeQueryStatiticsMgr { @@ -58,6 +62,10 @@ public: void set_workload_group_id(std::string query_id, int64_t wg_id); + // used for workload scheduler policy + void get_metric_map(std::string query_id, + std::map& metric_map); + private: std::shared_mutex _qs_ctx_map_lock; std::map> _query_statistics_ctx_map; diff --git a/be/src/runtime/workload_management/workload_action.h b/be/src/runtime/workload_management/workload_action.h index 29c01320b7..785acc73c3 100644 --- a/be/src/runtime/workload_management/workload_action.h +++ b/be/src/runtime/workload_management/workload_action.h @@ -55,9 +55,7 @@ private: class WorkloadActionFactory { public: static std::unique_ptr create_workload_action(TWorkloadAction* action) { - if (TWorkloadActionType::type::MOVE_QUERY_TO_GROUP == action->action) { - return std::make_unique(action->action_args); - } else if (TWorkloadActionType::type::CANCEL_QUERY == action->action) { + if (TWorkloadActionType::type::CANCEL_QUERY == action->action) { return std::make_unique(); } LOG(ERROR) << "not find a action " << action->action; diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 3486742c96..96387a2af4 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -84,9 +84,9 @@ public: TWorkloadMetricType::type metric_name = t_cond->metric_name; if (TWorkloadMetricType::type::QUERY_TIME == metric_name) { return std::make_unique(op, str_val); - } else if (TWorkloadMetricType::type::SCAN_ROWS == metric_name) { + } else if (TWorkloadMetricType::type::BE_SCAN_ROWS == metric_name) { return std::make_unique(op, str_val); - } else if (TWorkloadMetricType::type::SCAN_BYTES == metric_name) { + } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique(op, str_val); } LOG(ERROR) << "not find a metric name " << metric_name; diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp index 731dd0c866..c41a9e723e 100644 --- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -18,6 +18,7 @@ #include "runtime/workload_management/workload_sched_policy_mgr.h" #include "runtime/fragment_mgr.h" +#include "runtime/runtime_query_statistics_mgr.h" namespace doris { @@ -76,6 +77,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) { // 1 get query info std::vector list; + //todo(wb) maybe we can get runtime queryinfo from RuntimeQueryStatiticsMgr directly _exec_env->fragment_mgr()->get_runtime_query_info(&list); // todo: add timer if (list.size() == 0) { @@ -84,6 +86,9 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { for (int i = 0; i < list.size(); i++) { WorkloadQueryInfo* query_info_ptr = &(list[i]); + _exec_env->runtime_query_statistics_mgr()->get_metric_map(query_info_ptr->query_id, + query_info_ptr->metric_map); + // 2 get matched policy std::map> matched_policy_map; { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 541592f209..0d89becbb2 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2391,9 +2391,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int workload_sched_policy_interval_ms = 10000; // 10s - @ConfField(mutable = true) - public static int workload_action_interval_ms = 10000; // 10s - @ConfField(mutable = true, masterOnly = true) public static int workload_max_policy_num = 25; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8c65a71daf..76ff70589b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -111,7 +111,6 @@ import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; import org.apache.doris.common.publish.TopicPublisher; import org.apache.doris.common.publish.TopicPublisherThread; -import org.apache.doris.common.publish.WorkloadActionPublishThread; import org.apache.doris.common.publish.WorkloadGroupPublisher; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.DynamicPartitionUtil; @@ -522,8 +521,6 @@ public class Env { private TopicPublisherThread topicPublisherThread; - private WorkloadActionPublishThread workloadActionPublisherThread; - private MTMVService mtmvService; private InsertOverwriteManager insertOverwriteManager; @@ -761,8 +758,6 @@ public class Env { this.queryCancelWorker = new QueryCancelWorker(systemInfo); this.topicPublisherThread = new TopicPublisherThread( "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); - this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher", - Config.workload_action_interval_ms, systemInfo); this.mtmvService = new MTMVService(); this.insertOverwriteManager = new InsertOverwriteManager(); } @@ -1042,7 +1037,6 @@ public class Env { workloadGroupMgr.startUpdateThread(); workloadSchedPolicyMgr.start(); - workloadActionPublisherThread.start(); workloadRuntimeStatusMgr.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java deleted file mode 100644 index cacd7a9da9..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadActionPublishThread.java +++ /dev/null @@ -1,123 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.publish; - -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.Daemon; -import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.BackendService; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TTopicInfoType; -import org.apache.doris.thrift.TopicInfo; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -public class WorkloadActionPublishThread extends Daemon { - - private ExecutorService executor = ThreadPoolManager - .newDaemonFixedThreadPool(4, 256, "workload-action-publish-thread", true); - - private static final Logger LOG = LogManager.getLogger(WorkloadActionPublishThread.class); - - public static Map> workloadActionToplicInfoMap - = new HashMap>(); - - public static synchronized void putWorkloadAction(TTopicInfoType type, TopicInfo topicInfo) { - List list = workloadActionToplicInfoMap.get(type); - if (list == null) { - list = new ArrayList(); - workloadActionToplicInfoMap.put(type, list); - } - list.add(topicInfo); - } - - public static synchronized Map> getCurrentWorkloadActionMap() { - Map> retMap = workloadActionToplicInfoMap; - workloadActionToplicInfoMap = new HashMap>(); - return retMap; - } - - private SystemInfoService clusterInfoService; - - public WorkloadActionPublishThread(String name, long intervalMs, - SystemInfoService clusterInfoService) { - super(name, intervalMs); - this.clusterInfoService = clusterInfoService; - } - - @Override - protected final void runOneCycle() { - Map> actionMap - = WorkloadActionPublishThread.getCurrentWorkloadActionMap(); - if (actionMap.size() == 0) { - LOG.info("no workload action found, skip publish"); - return; - } - Collection currentBeToPublish = clusterInfoService.getIdToBackend().values(); - AckResponseHandler handler = new AckResponseHandler(currentBeToPublish); - TPublishTopicRequest request = new TPublishTopicRequest(); - request.setTopicMap(actionMap); - for (Backend be : currentBeToPublish) { - executor.submit(new WorkloadMoveActionTask(request, be, handler)); - } - } - - public class WorkloadMoveActionTask implements Runnable { - - private TPublishTopicRequest request; - - private Backend be; - - private ResponseHandler handler; - - public WorkloadMoveActionTask(TPublishTopicRequest request, Backend be, - ResponseHandler handler) { - this.request = request; - this.be = be; - this.handler = handler; - } - - @Override - public void run() { - long beginTime = System.currentTimeMillis(); - try { - TNetworkAddress addr = new TNetworkAddress(be.getHost(), be.getBePort()); - BackendService.Client client = ClientPool.backendPool.borrowObject(addr); - client.publishTopicInfo(request); - LOG.info("publish move action topic to be {} success, time cost={} ms", - be.getHost(), (System.currentTimeMillis() - beginTime)); - } catch (Exception e) { - LOG.warn("publish move action topic to be {} error happens: , time cost={} ms", - be.getHost(), (System.currentTimeMillis() - beginTime), e); - } finally { - handler.onResponse(be); - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java index d929878147..661ea6a45f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadAction.java @@ -30,8 +30,6 @@ public interface WorkloadAction { throws UserException { if (WorkloadActionType.CANCEL_QUERY.equals(workloadActionMeta.action)) { return WorkloadActionCancelQuery.createWorkloadAction(); - } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(workloadActionMeta.action)) { - return WorkloadActionMoveQueryToGroup.createWorkloadAction(workloadActionMeta.actionArgs); } else if (WorkloadActionType.SET_SESSION_VARIABLE.equals(workloadActionMeta.action)) { return WorkloadActionSetSessionVar.createWorkloadAction(workloadActionMeta.actionArgs); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java deleted file mode 100644 index 59e09345fa..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMoveQueryToGroup.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.resource.workloadschedpolicy; - -import org.apache.doris.common.publish.WorkloadActionPublishThread; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.thrift.TTopicInfoType; -import org.apache.doris.thrift.TWorkloadMoveQueryToGroupAction; -import org.apache.doris.thrift.TopicInfo; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class WorkloadActionMoveQueryToGroup implements WorkloadAction { - - private static final Logger LOG = LogManager.getLogger(WorkloadActionMoveQueryToGroup.class); - - private long dstWgId; - - public WorkloadActionMoveQueryToGroup(long dstWgId) { - this.dstWgId = dstWgId; - } - - @Override - public void exec(WorkloadQueryInfo queryInfo) { - if (queryInfo.context != null && !queryInfo.context.isKilled() - && queryInfo.tUniqueId != null - && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { - LOG.info("try move query {} to group {}", queryInfo.queryId, dstWgId); - - TWorkloadMoveQueryToGroupAction moveQueryToGroupAction = new TWorkloadMoveQueryToGroupAction(); - moveQueryToGroupAction.setQueryId(queryInfo.tUniqueId); - moveQueryToGroupAction.setWorkloadGroupId(dstWgId); - - TopicInfo topicInfo = new TopicInfo(); - topicInfo.setMoveAction(moveQueryToGroupAction); - - WorkloadActionPublishThread.putWorkloadAction(TTopicInfoType.MOVE_QUERY_TO_GROUP, topicInfo); - } - } - - @Override - public WorkloadActionType getWorkloadActionType() { - return WorkloadActionType.MOVE_QUERY_TO_GROUP; - } - - public static WorkloadActionMoveQueryToGroup createWorkloadAction(String groupId) { - long wgId = Long.parseLong(groupId); - return new WorkloadActionMoveQueryToGroup(wgId); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index 1f75d81794..5d89d2afae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -33,6 +33,10 @@ public interface WorkloadCondition { return WorkloadConditionUsername.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.QUERY_TIME.equals(cm.metricName)) { return WorkloadConditionQueryTime.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.BE_SCAN_ROWS.equals(cm.metricName)) { + return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { + return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); } throw new UserException("invalid metric name:" + cm.metricName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java new file mode 100644 index 0000000000..7431f2e0c4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public class WorkloadConditionBeScanBytes implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionBeScanBytes(WorkloadConditionOperator op, long value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String strValue) { + // currently not support run in fe, so this condition never match + return false; + } + + public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value) + throws UserException { + long longValue = Long.parseLong(value); + if (longValue < 0) { + throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0"); + } + return new WorkloadConditionBeScanBytes(op, longValue); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.BE_SCAN_BYTES; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java new file mode 100644 index 0000000000..c2fb638e08 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +public class WorkloadConditionBeScanRows implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String strValue) { + // currently not support run in fe, so this condition never match + return false; + } + + public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value) + throws UserException { + long longValue = Long.parseLong(value); + if (longValue < 0) { + throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0"); + } + return new WorkloadConditionBeScanRows(op, longValue); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.BE_SCAN_ROWS; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java index d5d2f922f3..52f50f924f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -44,6 +44,10 @@ public class WorkloadConditionMeta { return WorkloadMetricType.USERNAME; } else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) { return WorkloadMetricType.QUERY_TIME; + } else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) { + return WorkloadMetricType.BE_SCAN_ROWS; + } else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) { + return WorkloadMetricType.BE_SCAN_BYTES; } throw new UserException("invalid metric name:" + metricStr); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index f81d75c675..ed17414ec4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,5 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, SCAN_ROWS, SCAN_BYTES + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 13637b5b8b..8b028e3e75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -52,7 +52,9 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { // used for convert fe type to thrift type private static ImmutableMap METRIC_MAP = new ImmutableMap.Builder() - .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME).build(); + .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) + .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) + .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build(); private static ImmutableMap ACTION_MAP = new ImmutableMap.Builder() .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 0074020735..5c35d1ee09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -86,9 +86,8 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { .add(WorkloadActionType.CANCEL_QUERY).build(); public static final ImmutableSet BE_METRIC_SET - = new ImmutableSet.Builder().add(WorkloadMetricType.SCAN_ROWS) - .add(WorkloadMetricType.SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) - .build(); + = new ImmutableSet.Builder().add(WorkloadMetricType.BE_SCAN_ROWS) + .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -122,15 +121,12 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { } String username = cctx.getQualifiedUser(); - long queryTime = System.currentTimeMillis() - cctx.getStartTime(); - WorkloadQueryInfo policyQueryInfo = new WorkloadQueryInfo(); policyQueryInfo.queryId = cctx.queryId() == null ? null : DebugUtil.printId(cctx.queryId()); policyQueryInfo.tUniqueId = cctx.queryId(); policyQueryInfo.context = cctx; policyQueryInfo.metricMap = new HashMap<>(); policyQueryInfo.metricMap.put(WorkloadMetricType.USERNAME, username); - policyQueryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, String.valueOf(queryTime)); queryInfoList.add(policyQueryInfo); } @@ -174,19 +170,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { List originActions = createStmt.getActions(); List policyActionList = new ArrayList<>(); for (WorkloadActionMeta workloadActionMeta : originActions) { - WorkloadActionType actionName = workloadActionMeta.action; - String actionArgs = workloadActionMeta.actionArgs; - - // we need convert wgName to wgId, because wgName may change - if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(actionName)) { - Long wgId = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroupIdByName(actionArgs); - if (wgId == null) { - throw new UserException( - "can not find workload group " + actionArgs + " when set workload sched policy"); - } - workloadActionMeta.actionArgs = wgId.toString(); - } - + // todo(wb) support move action WorkloadAction ret = WorkloadAction.createWorkloadAction(workloadActionMeta); policyActionList.add(ret); } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index c59abd65d3..24edaefc10 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -179,15 +179,10 @@ struct TWorkloadGroupInfo { 9: optional i32 scan_thread_num } -struct TWorkloadMoveQueryToGroupAction { - 1: optional Types.TUniqueId query_id - 2: optional i64 workload_group_id -} - enum TWorkloadMetricType { QUERY_TIME - SCAN_ROWS - SCAN_BYTES + BE_SCAN_ROWS + BE_SCAN_BYTES } enum TCompareOperator { @@ -226,8 +221,7 @@ struct TWorkloadSchedPolicy { struct TopicInfo { 1: optional TWorkloadGroupInfo workload_group_info - 2: optional TWorkloadMoveQueryToGroupAction move_action - 3: optional TWorkloadSchedPolicy workload_sched_policy + 2: optional TWorkloadSchedPolicy workload_sched_policy } struct TPublishTopicRequest { diff --git a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out index 0c86700866..d32fff321e 100644 --- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out +++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out @@ -2,7 +2,6 @@ -- !select_policy_tvf -- be_policy query_time > 10 cancel_query 10 false 0 fe_policy username = root set_session_variable "workload_group=normal" 10 false 0 -move_action_policy query_time > 10 move_query_to_group "normal" 0 true 0 set_action_policy username = root set_session_variable "workload_group=normal" 0 true 0 test_cancel_policy query_time > 10 cancel_query 0 false 0 diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index cabba8ee00..603bbdf520 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -20,7 +20,6 @@ suite("test_workload_sched_policy") { sql "set experimental_enable_nereids_planner = false;" sql "drop workload schedule policy if exists test_cancel_policy;" - sql "drop workload schedule policy if exists move_action_policy;" sql "drop workload schedule policy if exists set_action_policy;" sql "drop workload schedule policy if exists fe_policy;" sql "drop workload schedule policy if exists be_policy;" @@ -30,17 +29,12 @@ suite("test_workload_sched_policy") { " conditions(query_time > 10) " + " actions(cancel_query) properties('enabled'='false'); " - // 2 create cancel policy - sql "create workload schedule policy move_action_policy " + - "conditions(query_time > 10) " + - "actions(move_query_to_group 'normal');" - - // 3 create set policy + // 2 create set policy sql "create workload schedule policy set_action_policy " + "conditions(username='root') " + "actions(set_session_variable 'workload_group=normal');" - // 4 create policy run in fe + // 3 create policy run in fe sql "create workload schedule policy fe_policy " + "conditions(username='root') " + "actions(set_session_variable 'workload_group=normal') " + @@ -49,7 +43,7 @@ suite("test_workload_sched_policy") { "'priority'='10' " + ");" - // 5 create policy run in be + // 4 create policy run in be sql "create workload schedule policy be_policy " + "conditions(query_time > 10) " + "actions(cancel_query) " + @@ -96,14 +90,6 @@ suite("test_workload_sched_policy") { exception "priority can only between" } - test { - sql "create workload schedule policy conflict_policy " + - "conditions (query_time > 0)" + - "actions(cancel_query, move_query_to_group 'normal');" - - exception "can not exist in one policy at same time" - } - test { sql "create workload schedule policy conflict_policy " + "conditions (query_time > 0) " + @@ -122,7 +108,6 @@ suite("test_workload_sched_policy") { // drop sql "drop workload schedule policy test_cancel_policy;" - sql "drop workload schedule policy move_action_policy;" sql "drop workload schedule policy set_action_policy;" sql "drop workload schedule policy fe_policy;" sql "drop workload schedule policy be_policy;"