[cherry-pick]Add workload metric query_be_memory (#35911)

This commit is contained in:
wangbo
2024-06-06 14:33:30 +08:00
committed by GitHub
parent b6ab0c4e59
commit 5cecbfc6ea
18 changed files with 214 additions and 55 deletions

View File

@ -190,6 +190,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms));
metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows()));
metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes()));
metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
std::to_string(ret_qs.get_current_used_memory_bytes()));
}
void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {

View File

@ -22,10 +22,14 @@
namespace doris {
void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id;
std::stringstream msg;
msg << "query " << query_info->query_id
<< " cancelled by workload policy: " << query_info->policy_name
<< ", id:" << query_info->policy_id;
std::string msg_str = msg.str();
LOG(INFO) << "[workload_schedule]" << msg_str;
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
std::string("query canceled by workload scheduler"));
query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR, msg_str);
}
void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {

View File

@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) {
return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes);
}
// query memory
WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op,
std::string str_val) {
_op = op;
_query_memory_bytes = std::stol(str_val);
}
bool WorkloadConditionQueryMemory::eval(std::string str_val) {
int64_t query_memory_bytes = std::stol(str_val);
return WorkloadCompareUtils::compare_signed_integer(_op, query_memory_bytes,
_query_memory_bytes);
}
} // namespace doris

View File

@ -23,7 +23,7 @@
namespace doris {
enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES };
class WorkloadCondition {
public:
@ -74,6 +74,19 @@ private:
WorkloadCompareOperator _op;
};
class WorkloadConditionQueryMemory : public WorkloadCondition {
public:
WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val);
bool eval(std::string str_val) override;
WorkloadMetricType get_workload_metric_type() override {
return WorkloadMetricType::QUERY_MEMORY_BYTES;
}
private:
int64_t _query_memory_bytes;
WorkloadCompareOperator _op;
};
class WorkloadConditionFactory {
public:
static std::unique_ptr<WorkloadCondition> create_workload_condition(
@ -88,6 +101,8 @@ public:
return std::make_unique<WorkloadConditionScanRows>(op, str_val);
} else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
} else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) {
return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
}
LOG(ERROR) << "not find a metric name " << metric_name;
return nullptr;

View File

@ -29,6 +29,8 @@ public:
TUniqueId tquery_id;
std::string query_id;
int64_t wg_id;
int64_t policy_id;
std::string policy_name;
};
} // namespace doris

View File

@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
for (int i = 0; i < _action_list.size(); i++) {
query_info->policy_id = this->_id;
query_info->policy_name = this->_name;
_action_list[i]->exec(query_info);
}
}

View File

@ -37,14 +37,12 @@ public class WorkloadActionMeta {
}
static WorkloadActionType getWorkloadActionType(String strType) throws UserException {
if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.CANCEL_QUERY;
} else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.MOVE_QUERY_TO_GROUP;
} else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) {
return WorkloadActionType.SET_SESSION_VARIABLE;
WorkloadActionType workloadActionType = WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase());
if (workloadActionType == null) {
throw new UserException("invalid action type " + strType);
} else {
return workloadActionType;
}
throw new UserException("invalid action type " + strType);
}
public String toString() {

View File

@ -37,6 +37,8 @@ public interface WorkloadCondition {
return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value);
} else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) {
return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value);
}
throw new UserException("invalid metric name:" + cm.metricName);
}

View File

@ -38,9 +38,14 @@ public class WorkloadConditionBeScanBytes implements WorkloadCondition {
public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid scan bytes value: " + value + ", it requires >= 0");
}
return new WorkloadConditionBeScanBytes(op, longValue);
}

View File

@ -38,9 +38,14 @@ public class WorkloadConditionBeScanRows implements WorkloadCondition {
public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid scan rows value: " + value + ", it requires >= 0");
}
return new WorkloadConditionBeScanRows(op, longValue);
}

View File

@ -40,16 +40,12 @@ public class WorkloadConditionMeta {
}
private static WorkloadMetricType getMetricType(String metricStr) throws UserException {
if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.USERNAME;
} else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.QUERY_TIME;
} else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_ROWS;
} else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
return WorkloadMetricType.BE_SCAN_BYTES;
WorkloadMetricType metricType = WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase());
if (metricType == null) {
throw new UserException("invalid metric name:" + metricStr);
} else {
return metricType;
}
throw new UserException("invalid metric name:" + metricStr);
}
public String toString() {

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.common.UserException;
public class WorkloadConditionQueryBeMemory implements WorkloadCondition {
private long value;
private WorkloadConditionOperator op;
public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long value) {
this.value = value;
this.op = op;
}
@Override
public boolean eval(String strValue) {
return false;
}
@Override
public WorkloadMetricType getMetricType() {
return WorkloadMetricType.QUERY_BE_MEMORY_BYTES;
}
public static WorkloadConditionQueryBeMemory createWorkloadCondition(WorkloadConditionOperator op,
String value) throws UserException {
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid query be memory value: " + value + ", it requires >= 0");
}
return new WorkloadConditionQueryBeMemory(op, longValue);
}
}

View File

@ -37,9 +37,14 @@ public class WorkloadConditionQueryTime implements WorkloadCondition {
public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value)
throws UserException {
long longValue = Long.parseLong(value);
if (longValue < 0) {
throw new UserException("invalid query time value, " + longValue + ", it requires >= 0");
long longValue = -1;
try {
longValue = Long.parseLong(value);
if (longValue < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new UserException("invalid query time value: " + value + ", it requires >= 0");
}
return new WorkloadConditionQueryTime(op, longValue);
}

View File

@ -18,5 +18,5 @@
package org.apache.doris.resource.workloadschedpolicy;
public enum WorkloadMetricType {
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES
}

View File

@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TWorkloadAction;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadCondition;
@ -31,7 +30,6 @@ import org.apache.doris.thrift.TWorkloadSchedPolicy;
import org.apache.doris.thrift.TopicInfo;
import com.esotericsoftware.minlog.Log;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();
// used for convert fe type to thrift type
private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build();
private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)
.put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build();
private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP
= new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>()
.put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
.put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
.put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL)
.put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
.put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build();
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
@ -255,12 +234,12 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
List<TWorkloadCondition> condList = new ArrayList();
for (WorkloadConditionMeta cond : conditionMetaList) {
TWorkloadCondition tCond = new TWorkloadCondition();
TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName);
if (metricType == null) {
return null;
}
tCond.setMetricName(metricType);
tCond.setOp(OP_MAP.get(cond.op));
tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op));
tCond.setValue(cond.value);
condList.add(tCond);
}
@ -268,7 +247,7 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
List<TWorkloadAction> actionList = new ArrayList();
for (WorkloadActionMeta action : actionMetaList) {
TWorkloadAction tAction = new TWorkloadAction();
TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action);
if (tActionType == null) {
return null;
}

View File

@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TCompareOperator;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TWorkloadActionType;
import org.apache.doris.thrift.TWorkloadMetricType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@ -80,6 +84,14 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs
.add("WorkloadGroup")
.build();
public static final ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP
= new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>()
.put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
.put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
.put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL)
.put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
.put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build();
public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
= new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build();
@ -93,7 +105,39 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs
public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
= new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build();
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
.add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
// used for convert fe type to thrift type
public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP
= new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>()
.put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
.put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS)
.put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES)
.put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP
= new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>()
.put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP)
.put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build();
public static final Map<String, WorkloadMetricType> STRING_METRIC_MAP = new HashMap<>();
public static final Map<String, WorkloadActionType> STRING_ACTION_MAP = new HashMap<>();
static {
for (WorkloadMetricType metricType : FE_METRIC_SET) {
STRING_METRIC_MAP.put(metricType.toString(), metricType);
}
for (WorkloadMetricType metricType : BE_METRIC_SET) {
STRING_METRIC_MAP.put(metricType.toString(), metricType);
}
for (WorkloadActionType actionType : FE_ACTION_SET) {
STRING_ACTION_MAP.put(actionType.toString(), actionType);
}
for (WorkloadActionType actionType : BE_ACTION_SET) {
STRING_ACTION_MAP.put(actionType.toString(), actionType);
}
}
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

View File

@ -187,6 +187,7 @@ enum TWorkloadMetricType {
QUERY_TIME
BE_SCAN_ROWS
BE_SCAN_BYTES
QUERY_BE_MEMORY_BYTES
}
enum TCompareOperator {

View File

@ -23,6 +23,9 @@ suite("test_workload_sched_policy") {
sql "drop workload policy if exists set_action_policy;"
sql "drop workload policy if exists fe_policy;"
sql "drop workload policy if exists be_policy;"
sql "drop workload policy if exists be_scan_row_policy;"
sql "drop workload policy if exists be_scan_bytes_policy;"
sql "drop workload policy if exists query_be_memory_used;"
// 1 create cancel policy
sql "create workload policy test_cancel_policy " +
@ -106,11 +109,38 @@ suite("test_workload_sched_policy") {
exception "duplicate set_session_variable action args one policy"
}
test {
sql "create workload policy invalid_metric_value_policy conditions(query_be_memory_bytes > '-1') actions(cancel_query);"
exception "invalid"
}
test {
sql "create workload policy invalid_metric_value_policy conditions(query_time > '-1') actions(cancel_query);"
exception "invalid"
}
test {
sql "create workload policy invalid_metric_value_policy conditions(be_scan_rows > '-1') actions(cancel_query);"
exception "invalid"
}
test {
sql "create workload policy invalid_metric_value_policy conditions(be_scan_bytes > '-1') actions(cancel_query);"
exception "invalid"
}
sql "create workload policy be_scan_row_policy conditions(be_scan_rows > 1) actions(cancel_query) properties('enabled'='false');"
sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes > 1) actions(cancel_query) properties('enabled'='false');"
sql "create workload policy query_be_memory_used conditions(query_be_memory_bytes > 1) actions(cancel_query) properties('enabled'='false');"
// drop
sql "drop workload policy test_cancel_policy;"
sql "drop workload policy set_action_policy;"
sql "drop workload policy fe_policy;"
sql "drop workload policy be_policy;"
sql "drop workload policy be_scan_row_policy;"
sql "drop workload policy be_scan_bytes_policy;"
sql "drop workload policy query_be_memory_used;"
qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from information_schema.workload_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;"