[chore](user) Add user property parallel_fragment_exec_instance_num (#28447)

This commit is contained in:
Xinyi Zou
2023-12-19 18:33:01 +08:00
committed by GitHub
parent 15e31d74e3
commit 73a3d84c5e
7 changed files with 69 additions and 1 deletions

View File

@ -959,6 +959,12 @@ public class Auth implements Writable {
Env.getCurrentEnv().getEditLog().logUpdateUserProperty(propertyInfo);
}
LOG.info("finished to set properties for user: {}", user);
} catch (DdlException e) {
if (isReplay && e.getMessage().contains("Unknown user property")) {
LOG.warn("ReplayUpdateUserProperty failed, maybe FE rolled back version, " + e.getMessage());
} else {
throw e;
}
} finally {
writeUnlock();
}
@ -1000,6 +1006,15 @@ public class Auth implements Writable {
}
}
public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
readLock();
try {
return propertyMgr.getParallelFragmentExecInstanceNum(qualifiedUser);
} finally {
readUnlock();
}
}
public String[] getSqlBlockRules(String qualifiedUser) {
readLock();
try {

View File

@ -45,6 +45,8 @@ public class CommonUserProperties implements Writable {
// The maximum total number of query instances that the user is allowed to send from this FE
@SerializedName("maxQueryInstances")
private long maxQueryInstances = -1;
@SerializedName("parallelFragmentExecInstanceNum")
private int parallelFragmentExecInstanceNum = -1;
@SerializedName("sqlBlockRules")
private String sqlBlockRules = "";
@SerializedName("cpuResourceLimit")
@ -75,6 +77,10 @@ public class CommonUserProperties implements Writable {
return maxQueryInstances;
}
int getParallelFragmentExecInstanceNum() {
return parallelFragmentExecInstanceNum;
}
String getSqlBlockRules() {
return sqlBlockRules;
}
@ -91,6 +97,10 @@ public class CommonUserProperties implements Writable {
this.maxQueryInstances = maxQueryInstances;
}
void setParallelFragmentExecInstanceNum(int parallelFragmentExecInstanceNum) {
this.parallelFragmentExecInstanceNum = parallelFragmentExecInstanceNum;
}
void setSqlBlockRules(String sqlBlockRules) {
this.sqlBlockRules = sqlBlockRules;
setSqlBlockRulesSplit(sqlBlockRules);

View File

@ -63,6 +63,7 @@ public class UserProperty implements Writable {
// advanced properties
private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections";
private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances";
private static final String PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
private static final String PROP_RESOURCE_TAGS = "resource_tags";
private static final String PROP_RESOURCE = "resource";
private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules";
@ -113,6 +114,8 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "."
+ DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + "$",
Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_SQL_BLOCK_RULES + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
@ -154,6 +157,10 @@ public class UserProperty implements Writable {
return commonProperties.getMaxQueryInstances(); // maxQueryInstances;
}
public int getParallelFragmentExecInstanceNum() {
return commonProperties.getParallelFragmentExecInstanceNum();
}
public String[] getSqlBlockRules() {
return commonProperties.getSqlBlockRulesSplit();
}
@ -187,6 +194,7 @@ public class UserProperty implements Writable {
// copy
long newMaxConn = this.commonProperties.getMaxConn();
long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances();
int newParallelFragmentExecInstanceNum = this.commonProperties.getParallelFragmentExecInstanceNum();
String sqlBlockRules = this.commonProperties.getSqlBlockRules();
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
@ -242,6 +250,17 @@ public class UserProperty implements Writable {
} catch (NumberFormatException e) {
throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
// set property "parallel_fragment_exec_instance_num" = "16"
if (keyArr.length != 1) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " format error");
}
try {
newParallelFragmentExecInstanceNum = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_SQL_BLOCK_RULES)) {
// set property "sql_block_rules" = "test_rule1,test_rule2"
if (keyArr.length != 1) {
@ -337,6 +356,7 @@ public class UserProperty implements Writable {
// set
this.commonProperties.setMaxConn(newMaxConn);
this.commonProperties.setMaxQueryInstances(newMaxQueryInstances);
this.commonProperties.setParallelFragmentExecInstanceNum(newParallelFragmentExecInstanceNum);
this.commonProperties.setSqlBlockRules(sqlBlockRules);
this.commonProperties.setCpuResourceLimit(cpuResourceLimit);
this.commonProperties.setResourceTags(resourceTags);
@ -456,6 +476,10 @@ public class UserProperty implements Writable {
result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES,
String.valueOf(commonProperties.getMaxQueryInstances())));
// parallel fragment exec instance num
result.add(Lists.newArrayList(PROP_PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
String.valueOf(commonProperties.getParallelFragmentExecInstanceNum())));
// sql block rules
result.add(Lists.newArrayList(PROP_SQL_BLOCK_RULES, commonProperties.getSqlBlockRules()));

View File

@ -119,6 +119,15 @@ public class UserPropertyMgr implements Writable {
return existProperty.getMaxQueryInstances();
}
public int getParallelFragmentExecInstanceNum(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
return -1;
}
return existProperty.getParallelFragmentExecInstanceNum();
}
public Set<Tag> getResourceTags(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);

View File

@ -1969,6 +1969,14 @@ public class SessionVariable implements Serializable, Writable {
}
public int getParallelExecInstanceNum() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null && connectContext.getEnv().getAuth() != null) {
int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
.getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser());
if (userParallelExecInstanceNum > 0) {
return userParallelExecInstanceNum;
}
}
if (getEnablePipelineEngine() && parallelPipelineTaskNum == 0) {
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
int autoInstance = (size + 1) / 2;