[Improvement][SET-PROPERTY] Support for set query_timeout property (#13444)
This commit is contained in:
@ -78,6 +78,8 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
throw new AfterConnectedException("Reach limit of connections");
|
||||
}
|
||||
context.setStartTime();
|
||||
context.setUserQueryTimeout(
|
||||
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
|
||||
ConnectProcessor processor = new ConnectProcessor(context);
|
||||
context.startAcceptQuery(processor);
|
||||
} catch (AfterConnectedException e) {
|
||||
|
||||
@ -51,6 +51,9 @@ public class CommonUserProperties implements Writable {
|
||||
@SerializedName("execMemLimit")
|
||||
private long execMemLimit = -1;
|
||||
|
||||
@SerializedName("queryTimeout")
|
||||
private long queryTimeout = -1;
|
||||
|
||||
private String[] sqlBlockRulesSplit = {};
|
||||
|
||||
long getMaxConn() {
|
||||
@ -111,6 +114,14 @@ public class CommonUserProperties implements Writable {
|
||||
this.execMemLimit = execMemLimit;
|
||||
}
|
||||
|
||||
public long getQueryTimeout() {
|
||||
return queryTimeout;
|
||||
}
|
||||
|
||||
public void setQueryTimeout(long timeout) {
|
||||
this.queryTimeout = timeout;
|
||||
}
|
||||
|
||||
public static CommonUserProperties read(DataInput in) throws IOException {
|
||||
String json = Text.readString(in);
|
||||
CommonUserProperties commonUserProperties = GsonUtils.GSON.fromJson(json, CommonUserProperties.class);
|
||||
|
||||
@ -1363,6 +1363,15 @@ public class PaloAuth implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public long getQueryTimeout(String qualifiedUser) {
|
||||
readLock();
|
||||
try {
|
||||
return propertyMgr.getQueryTimeout(qualifiedUser);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long getMaxQueryInstances(String qualifiedUser) {
|
||||
readLock();
|
||||
try {
|
||||
|
||||
@ -63,6 +63,7 @@ public class UserProperty implements Writable {
|
||||
private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules";
|
||||
private static final String PROP_CPU_RESOURCE_LIMIT = "cpu_resource_limit";
|
||||
private static final String PROP_EXEC_MEM_LIMIT = "exec_mem_limit";
|
||||
private static final String PROP_USER_QUERY_TIMEOUT = "query_timeout";
|
||||
// advanced properties end
|
||||
|
||||
private static final String PROP_LOAD_CLUSTER = "load_cluster";
|
||||
@ -108,6 +109,7 @@ public class UserProperty implements Writable {
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_QUERY_TIMEOUT + "$", Pattern.CASE_INSENSITIVE));
|
||||
|
||||
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE));
|
||||
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE));
|
||||
@ -130,6 +132,10 @@ public class UserProperty implements Writable {
|
||||
return this.commonProperties.getMaxConn();
|
||||
}
|
||||
|
||||
public long getQueryTimeout() {
|
||||
return this.commonProperties.getQueryTimeout();
|
||||
}
|
||||
|
||||
public long getMaxQueryInstances() {
|
||||
return commonProperties.getMaxQueryInstances(); // maxQueryInstances;
|
||||
}
|
||||
@ -176,6 +182,7 @@ public class UserProperty implements Writable {
|
||||
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
|
||||
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
|
||||
long execMemLimit = this.commonProperties.getExecMemLimit();
|
||||
long queryTimeout = this.commonProperties.getQueryTimeout();
|
||||
|
||||
UserResource newResource = resource.getCopiedUserResource();
|
||||
String newDefaultLoadCluster = defaultLoadCluster;
|
||||
@ -314,6 +321,15 @@ public class UserProperty implements Writable {
|
||||
} else if (keyArr[0].equalsIgnoreCase(PROP_EXEC_MEM_LIMIT)) {
|
||||
// set property "exec_mem_limit" = "2147483648";
|
||||
execMemLimit = getLongProperty(key, value, keyArr, PROP_EXEC_MEM_LIMIT);
|
||||
} else if (keyArr[0].equalsIgnoreCase(PROP_USER_QUERY_TIMEOUT)) {
|
||||
if (keyArr.length != 1) {
|
||||
throw new DdlException(PROP_MAX_USER_CONNECTIONS + " format error");
|
||||
}
|
||||
try {
|
||||
queryTimeout = Long.parseLong(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException(PROP_USER_QUERY_TIMEOUT + " is not number");
|
||||
}
|
||||
} else {
|
||||
throw new DdlException("Unknown user property(" + key + ")");
|
||||
}
|
||||
@ -326,6 +342,7 @@ public class UserProperty implements Writable {
|
||||
this.commonProperties.setCpuResourceLimit(cpuResourceLimit);
|
||||
this.commonProperties.setResourceTags(resourceTags);
|
||||
this.commonProperties.setExecMemLimit(execMemLimit);
|
||||
this.commonProperties.setQueryTimeout(queryTimeout);
|
||||
resource = newResource;
|
||||
if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
|
||||
defaultLoadCluster = newDefaultLoadCluster;
|
||||
@ -452,6 +469,9 @@ public class UserProperty implements Writable {
|
||||
// exec mem limit
|
||||
result.add(Lists.newArrayList(PROP_EXEC_MEM_LIMIT, String.valueOf(commonProperties.getExecMemLimit())));
|
||||
|
||||
// query timeout
|
||||
result.add(Lists.newArrayList(PROP_USER_QUERY_TIMEOUT, String.valueOf(commonProperties.getQueryTimeout())));
|
||||
|
||||
// resource tag
|
||||
result.add(Lists.newArrayList(PROP_RESOURCE_TAGS, Joiner.on(", ").join(commonProperties.getResourceTags())));
|
||||
|
||||
|
||||
@ -130,6 +130,15 @@ public class UserPropertyMgr implements Writable {
|
||||
property.update(properties);
|
||||
}
|
||||
|
||||
public long getQueryTimeout(String qualifiedUser) {
|
||||
UserProperty existProperty = propertyMap.get(qualifiedUser);
|
||||
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
|
||||
if (existProperty == null) {
|
||||
return 0;
|
||||
}
|
||||
return existProperty.getQueryTimeout();
|
||||
}
|
||||
|
||||
public long getMaxConn(String qualifiedUser) {
|
||||
UserProperty existProperty = propertyMap.get(qualifiedUser);
|
||||
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
|
||||
|
||||
@ -147,6 +147,12 @@ public class ConnectContext {
|
||||
|
||||
private SessionContext sessionContext;
|
||||
|
||||
private long userQueryTimeout;
|
||||
|
||||
public void setUserQueryTimeout(long queryTimeout) {
|
||||
this.userQueryTimeout = queryTimeout;
|
||||
}
|
||||
|
||||
private StatementContext statementContext;
|
||||
|
||||
public SessionContext getSessionContext() {
|
||||
@ -562,12 +568,23 @@ public class ConnectContext {
|
||||
killConnection = true;
|
||||
}
|
||||
} else {
|
||||
if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
|
||||
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
|
||||
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS());
|
||||
if (userQueryTimeout > 0) {
|
||||
// user set query_timeout property
|
||||
if (delta > userQueryTimeout * 1000) {
|
||||
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
|
||||
getMysqlChannel().getRemoteHostPortString(), userQueryTimeout);
|
||||
|
||||
// Only kill
|
||||
killFlag = true;
|
||||
killFlag = true;
|
||||
}
|
||||
} else {
|
||||
// default use session query_timeout
|
||||
if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
|
||||
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
|
||||
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS());
|
||||
|
||||
// Only kill
|
||||
killFlag = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (killFlag) {
|
||||
|
||||
@ -193,6 +193,7 @@ public class ConnectScheduler {
|
||||
return;
|
||||
}
|
||||
|
||||
context.setUserQueryTimeout(context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
|
||||
context.setStartTime();
|
||||
ConnectProcessor processor = new ConnectProcessor(context);
|
||||
processor.loop();
|
||||
|
||||
@ -107,6 +107,7 @@ public class UserPropertyTest {
|
||||
properties.add(Pair.of("max_qUERY_instances", "3000"));
|
||||
properties.add(Pair.of("sql_block_rules", "rule1,rule2"));
|
||||
properties.add(Pair.of("cpu_resource_limit", "2"));
|
||||
properties.add(Pair.of("query_timeout", "500"));
|
||||
|
||||
UserProperty userProperty = new UserProperty();
|
||||
userProperty.update(properties);
|
||||
@ -118,6 +119,7 @@ public class UserPropertyTest {
|
||||
Assert.assertEquals(3000, userProperty.getMaxQueryInstances());
|
||||
Assert.assertEquals(new String[]{"rule1", "rule2"}, userProperty.getSqlBlockRules());
|
||||
Assert.assertEquals(2, userProperty.getCpuResourceLimit());
|
||||
Assert.assertEquals(500, userProperty.getQueryTimeout());
|
||||
|
||||
// fetch property
|
||||
List<List<String>> rows = userProperty.fetchProperty();
|
||||
@ -141,6 +143,8 @@ public class UserPropertyTest {
|
||||
Assert.assertEquals("rule1,rule2", value);
|
||||
} else if (key.equalsIgnoreCase("cpu_resource_limit")) {
|
||||
Assert.assertEquals("2", value);
|
||||
} else if (key.equalsIgnoreCase("query_timeout")) {
|
||||
Assert.assertEquals("500", value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -279,7 +279,7 @@ public class ResourceTagQueryTest {
|
||||
Assert.assertEquals(1000000, execMemLimit);
|
||||
|
||||
List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(PaloAuth.ROOT_USER);
|
||||
Assert.assertEquals(16, userProps.size());
|
||||
Assert.assertEquals(17, userProps.size());
|
||||
}
|
||||
|
||||
private void checkTableReplicaAllocation(OlapTable tbl) throws InterruptedException {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.mysql.MysqlCapability;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
import org.apache.doris.mysql.MysqlCommand;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import mockit.Expectations;
|
||||
@ -43,6 +44,10 @@ public class ConnectContextTest {
|
||||
private Env env;
|
||||
@Mocked
|
||||
private ConnectScheduler connectScheduler;
|
||||
@Mocked
|
||||
private PaloAuth paloAuth;
|
||||
@Mocked
|
||||
private String qualifiedUser;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -166,6 +171,14 @@ public class ConnectContextTest {
|
||||
ctx.checkTimeout(now);
|
||||
Assert.assertTrue(ctx.isKilled());
|
||||
|
||||
// user query timeout
|
||||
ctx.setStartTime();
|
||||
ctx.setUserQueryTimeout(1);
|
||||
now = ctx.getStartTime() + paloAuth.getQueryTimeout(qualifiedUser) * 1000 + 1;
|
||||
ctx.setExecutor(executor);
|
||||
ctx.checkTimeout(now);
|
||||
Assert.assertTrue(ctx.isKilled());
|
||||
|
||||
// Kill
|
||||
ctx.kill(true);
|
||||
Assert.assertTrue(ctx.isKilled());
|
||||
|
||||
Reference in New Issue
Block a user