[ResourceLimit] Add a property to limit user instance num. (#6159)

Add a property to limit user instance num.
This commit is contained in:
Lijia Liu
2021-07-10 10:15:05 +08:00
committed by GitHub
parent 5a1aa3ec35
commit b5f447b932
17 changed files with 259 additions and 48 deletions

View File

@ -1419,9 +1419,15 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_dynamic_partition_num = 500;
/*
/**
* Control the max num of backup/restore job per db
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;
/**
* Control the default max num of the instance for a user.
*/
@ConfField(mutable = true)
public static int default_max_query_instances = -1;
}

View File

@ -210,6 +210,8 @@ public final class FeMetaVersion {
public static final int VERSION_98 = 98;
// add audit steam load and change the serialization backend method to json
public static final int VERSION_99 = 99;
// for max query instance
public static final int VERSION_100 = 100;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_99;
public static final int VERSION_CURRENT = VERSION_100;
}

View File

@ -32,10 +32,12 @@ import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Sets;
@ -49,6 +51,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
public final class MetricRepo {
private static final Logger LOG = LogManager.getLogger(MetricRepo.class);
@ -302,6 +305,16 @@ public final class MetricRepo {
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", "ms"));
METRIC_REGISTER.register(MetricRegistry.name("palo", "fe", "query", "max_instances_num_per_user"), (Gauge<Integer>) () -> {
try{
return ((QeProcessorImpl)QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream()
.reduce(-1, BinaryOperator.maxBy(Integer::compareTo));
} catch (Throwable ex) {
LOG.warn("Get max_instances_num_per_user error", ex);
return -2;
}
});
// init system metrics
initSystemMetrics();

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.privilege;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Used in
*/
public class CommonUserProperties implements Writable {
@SerializedName("maxConn")
private long maxConn = 100;
@SerializedName("maxQueryInstances")
private long maxQueryInstances = -1;
long getMaxConn() {
return maxConn;
}
long getMaxQueryInstances() {
return maxQueryInstances;
}
void setMaxConn(long maxConn) {
this.maxConn = maxConn;
}
void setMaxQueryInstances(long maxQueryInstances) {
this.maxQueryInstances = maxQueryInstances;
}
public static CommonUserProperties read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, CommonUserProperties.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -1048,6 +1048,15 @@ public class PaloAuth implements Writable {
}
}
public long getMaxQueryInstances(String qualifiedUser) {
readLock();
try {
return propertyMgr.getMaxQueryInstances(qualifiedUser);
} finally {
readUnlock();
}
}
public void getAllDomains(Set<String> allDomains) {
readLock();
try {

View File

@ -39,8 +39,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
@ -58,9 +56,12 @@ import java.util.regex.Pattern;
* This user is just qualified by cluster name, not host which it connected from.
*/
public class UserProperty implements Writable {
private static final Logger LOG = LogManager.getLogger(UserProperty.class);
// common properties
private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections";
private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances";
// common properties end
private static final String PROP_RESOURCE = "resource";
private static final String PROP_QUOTA = "quota";
private static final String PROP_DEFAULT_LOAD_CLUSTER = "default_load_cluster";
@ -73,7 +74,8 @@ public class UserProperty implements Writable {
private String qualifiedUser;
private long maxConn = 100;
private CommonUserProperties commonProperties = new CommonUserProperties();
// Resource belong to this user.
private UserResource resource = new UserResource(1000);
// load cluster
@ -87,20 +89,12 @@ public class UserProperty implements Writable {
*/
private WhiteList whiteList = new WhiteList();
@Deprecated
private byte[] password;
@Deprecated
private boolean isAdmin = false;
@Deprecated
private boolean isSuperuser = false;
@Deprecated
private Map<String, AccessPrivilege> dbPrivMap = Maps.newHashMap();
static {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_USER_CONNECTIONS + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE + ".", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "."
+ DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE));
@ -120,33 +114,17 @@ public class UserProperty implements Writable {
}
public long getMaxConn() {
return maxConn;
return this.commonProperties.getMaxConn();
}
public long getMaxQueryInstances() {
return commonProperties.getMaxQueryInstances();// maxQueryInstances;
}
public WhiteList getWhiteList() {
return whiteList;
}
@Deprecated
public byte[] getPassword() {
return password;
}
@Deprecated
public boolean isAdmin() {
return isAdmin;
}
@Deprecated
public boolean isSuperuser() {
return isSuperuser;
}
@Deprecated
public Map<String, AccessPrivilege> getDbPrivMap() {
return dbPrivMap;
}
public void setPasswordForDomain(String domain, byte[] password, boolean errOnExist) throws DdlException {
if (errOnExist && whiteList.containsDomain(domain)) {
throw new DdlException("Domain " + domain + " of user " + qualifiedUser + " already exists");
@ -163,7 +141,8 @@ public class UserProperty implements Writable {
public void update(List<Pair<String, String>> properties) throws DdlException {
// copy
long newMaxConn = maxConn;
long newMaxConn = this.commonProperties.getMaxConn();
long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances();
UserResource newResource = resource.getCopiedUserResource();
String newDefaultLoadCluster = defaultLoadCluster;
Map<String, DppConfig> newDppConfigs = Maps.newHashMap(clusterToDppConfig);
@ -237,13 +216,25 @@ public class UserProperty implements Writable {
}
newDefaultLoadCluster = value;
} else if (keyArr[0].equalsIgnoreCase(PROP_MAX_QUERY_INSTANCES)) {
// set property "max_query_instances" = "1000"
if (keyArr.length != 1) {
throw new DdlException(PROP_MAX_QUERY_INSTANCES + " format error");
}
try {
newMaxQueryInstances = Long.parseLong(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number");
}
} else {
throw new DdlException("Unknown user property(" + key + ")");
}
}
// set
maxConn = newMaxConn;
this.commonProperties.setMaxConn(newMaxConn);
this.commonProperties.setMaxQueryInstances(newMaxQueryInstances);
resource = newResource;
if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
defaultLoadCluster = newDefaultLoadCluster;
@ -326,7 +317,10 @@ public class UserProperty implements Writable {
String dot = SetUserPropertyVar.DOT_SEPARATOR;
// max user connections
result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(maxConn)));
result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(commonProperties.getMaxConn())));
// max query instance
result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(commonProperties.getMaxQueryInstances())));
// resource
ResourceGroup group = resource.getResource();
@ -404,10 +398,12 @@ public class UserProperty implements Writable {
return userProperty;
}
@Override
public void write(DataOutput out) throws IOException {
// user name
Text.writeString(out, qualifiedUser);
out.writeLong(maxConn);
// user resource
resource.write(out);
@ -419,15 +415,19 @@ public class UserProperty implements Writable {
out.writeBoolean(true);
Text.writeString(out, defaultLoadCluster);
}
out.writeInt(clusterToDppConfig.size());
for (Map.Entry<String, DppConfig> entry : clusterToDppConfig.entrySet()) {
Text.writeString(out, entry.getKey());
entry.getValue().write(out);
}
// whiteList
whiteList.write(out);
// common properties
commonProperties.write(out);
}
public void readFields(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) {
// consume the flag of empty user name
@ -443,19 +443,25 @@ public class UserProperty implements Writable {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) {
int passwordLen = in.readInt();
password = new byte[passwordLen];
byte[] password = new byte[passwordLen];
in.readFully(password);
isAdmin = in.readBoolean();
// boolean isAdmin
in.readBoolean();
if (Catalog.getCurrentCatalogJournalVersion() >= 1) {
isSuperuser = in.readBoolean();
// boolean isSuperuser
in.readBoolean();
}
}
maxConn = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_100) {
long maxConn = in.readLong();
this.commonProperties.setMaxConn(maxConn);
}
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) {
Map<String, AccessPrivilege> dbPrivMap = Maps.newHashMap();
int numPriv = in.readInt();
for (int i = 0; i < numPriv; ++i) {
String dbName = null;
@ -487,6 +493,7 @@ public class UserProperty implements Writable {
}
}
// whiteList
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_21) {
whiteList.readFields(in);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_69) {
@ -502,5 +509,10 @@ public class UserProperty implements Writable {
}
}
}
// common properties
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_100) {
this.commonProperties = CommonUserProperties.read(in);
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.mysql.privilege;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
@ -122,6 +123,14 @@ public class UserPropertyMgr implements Writable {
return existProperty.getMaxConn();
}
public long getMaxQueryInstances(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
if (existProperty == null) {
return Config.default_max_query_instances;
}
return existProperty.getMaxQueryInstances();
}
public int getPropertyMapSize() {
return propertyMap.size();
}

View File

@ -412,6 +412,8 @@ public class Coordinator {
traceInstance();
QeProcessorImpl.INSTANCE.registerInstances(queryId, instanceIds.size());
// create result receiver
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);

View File

@ -33,6 +33,8 @@ public interface QeProcessor {
void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException;
void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException;
void unregisterQuery(TUniqueId queryId);
Map<String, QueryStatisticsItem> getQueryStatistics();

View File

@ -17,6 +17,7 @@
package org.apache.doris.qe;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@ -28,19 +29,25 @@ import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
public final class QeProcessorImpl implements QeProcessor {
private static final Logger LOG = LogManager.getLogger(QeProcessorImpl.class);
private Map<TUniqueId, QueryInfo> coordinatorMap;
private Map<TUniqueId, Integer> queryToInstancesNum;
private Map<String, AtomicInteger> userToInstancesCount;
public static final QeProcessor INSTANCE;
static {
@ -50,10 +57,12 @@ public final class QeProcessorImpl implements QeProcessor {
private ExecutorService writeProfileExecutor;
private QeProcessorImpl() {
coordinatorMap = Maps.newConcurrentMap();
coordinatorMap = new ConcurrentHashMap<>();
// write profile to ProfileManager when query is running.
writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100,
"profile-write-pool", true);
queryToInstancesNum = new ConcurrentHashMap<>();
userToInstancesCount = new ConcurrentHashMap<>();
}
@Override
@ -79,10 +88,58 @@ public final class QeProcessorImpl implements QeProcessor {
}
}
public void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException {
if (!coordinatorMap.containsKey(queryId)) {
throw new UserException("query not exists in coordinatorMap:" + DebugUtil.printId(queryId));
}
QueryInfo queryInfo = coordinatorMap.get(queryId);
if (queryInfo.getConnectContext() != null &&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
String user = queryInfo.getConnectContext().getQualifiedUser();
long maxQueryInstances = queryInfo.getConnectContext().getCatalog().getAuth().getMaxQueryInstances(user);
if (maxQueryInstances <= 0) {
maxQueryInstances = Config.default_max_query_instances;
}
if (maxQueryInstances > 0) {
AtomicInteger currentCount = userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0));
// Many query can reach here.
if (instancesNum + currentCount.get() > maxQueryInstances) {
throw new UserException("reach max_query_instances " + maxQueryInstances);
}
}
queryToInstancesNum.put(queryId, instancesNum);
userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0)).addAndGet(instancesNum);
}
}
public Map<String, Integer> getInstancesNumPerUser() {
return Maps.transformEntries(userToInstancesCount, (__, value) -> value != null ? value.get() : 0);
}
@Override
public void unregisterQuery(TUniqueId queryId) {
if (coordinatorMap.remove(queryId) != null) {
QueryInfo queryInfo = coordinatorMap.remove(queryId);
if (queryInfo != null) {
LOG.info("deregister query id {}", DebugUtil.printId(queryId));
if (queryInfo.getConnectContext() != null &&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
Integer num = queryToInstancesNum.remove(queryId);
if (num != null) {
String user = queryInfo.getConnectContext().getQualifiedUser();
AtomicInteger instancesNum = userToInstancesCount.get(user);
if (instancesNum == null) {
LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount",
DebugUtil.printId(queryId)
);
} else {
instancesNum.addAndGet(-num);
}
}
}
} else {
LOG.warn("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
}
}

View File

@ -64,6 +64,7 @@ public class UserPropertyTest {
properties.add(Pair.create("quota.normal", "102"));
properties.add(Pair.create("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2"));
properties.add(Pair.create("default_load_cluster", "dpp-cluster"));
properties.add(Pair.create("max_qUERY_instances", "3000"));
UserProperty userProperty = new UserProperty();
userProperty.update(properties);
@ -72,6 +73,7 @@ public class UserPropertyTest {
Assert.assertEquals(102, userProperty.getResource().getShareByGroup().get("normal").intValue());
Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath());
Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster());
Assert.assertEquals(3000, userProperty.getMaxQueryInstances());
// fetch property
List<List<String>> rows = userProperty.fetchProperty();
@ -89,6 +91,8 @@ public class UserPropertyTest {
Assert.assertEquals("/user/palo2", value);
} else if (key.equalsIgnoreCase("default_load_cluster")) {
Assert.assertEquals("dpp-cluster", value);
} else if (key.equalsIgnoreCase("max_query_instances")) {
Assert.assertEquals("3000", value);
}
}