[Enhancement](load) remove load mem limit (#13111)
#12716 removed the mem limit for single load task, in this PR I propose to remove the session variable load_mem_limit, to avoid confusing. For compatibility, load_mem_limit in thrift not removed, the value is set equal to exec_mem_limit in FE
This commit is contained in:
@ -272,14 +272,6 @@ Note that the comment must start with /*+ and can only follow the SELECT.
|
||||
|
||||
Show Doris's license. No other effect.
|
||||
|
||||
* `load_mem_limit`
|
||||
|
||||
Used to specify the memory limit of the load operation. The default is 2GB.
|
||||
|
||||
Broker Load, Stream Load and Routine Load use `load_mem_limit` by default; if the user specifies the task `exec_mem_limit` parameter when creating a load, the specified value is used.
|
||||
|
||||
The INSERT operation has two parts: query and import. The memory limit of the load part of INSERT is `load_mem_limit`, and the query part is limited to `exec_mem_limit`.
|
||||
|
||||
* `lower_case_table_names`
|
||||
|
||||
Used to control whether the user table name is case-sensitive.
|
||||
|
||||
@ -54,15 +54,13 @@ Super user privileges:
|
||||
|
||||
exec_mem_limit: Limit the memory usage of the query. See the introduction to the session variable `exec_mem_limit` for details. -1 means not set.
|
||||
|
||||
load_mem_limit: Limit imported memory usage. See the introduction to the session variable `load_mem_limit` for details. -1 means not set.
|
||||
|
||||
resource.cpu_share: CPU resource allocation. (obsolete)
|
||||
|
||||
load_cluster.{cluster_name}.priority: Assign priority to the specified cluster, which can be HIGH or NORMAL
|
||||
|
||||
resource_tags: Specifies the user's resource tag permissions.
|
||||
|
||||
Note: If the three attributes `cpu_resource_limit`, `exec_mem_limit`, `load_mem_limit` are not set, the value in the session variable will be used by default.
|
||||
Note: If the attributes `cpu_resource_limit`, `exec_mem_limit` are not set, the value in the session variable will be used by default.
|
||||
|
||||
Ordinary user rights:
|
||||
|
||||
@ -158,12 +156,6 @@ Data, etl program automatically retains the next use.
|
||||
SET PROPERTY FOR 'jack' 'exec_mem_limit' = '2147483648';
|
||||
````
|
||||
|
||||
13. Modify the user's import memory usage limit, in bytes
|
||||
|
||||
```sql
|
||||
SET PROPERTY FOR 'jack' 'load_mem_limit' = '2147483648';
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
SET, PROPERTY
|
||||
|
||||
@ -268,14 +268,6 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3);
|
||||
|
||||
显示 Doris 的 License。无其他作用。
|
||||
|
||||
- `load_mem_limit`
|
||||
|
||||
用于指定所有导入的内存限制。默认是2GB。
|
||||
|
||||
对于 Broker Load, Stream Load 和 Routine Load,默认使用`load_mem_limit`; 如果用户创建任务时指定任务`exec_mem_limit`参数,则使用指定的值。
|
||||
|
||||
这个变量也用于 INSERT 操作。 INSERT 操作设计查询和导入两个部分, INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`。
|
||||
|
||||
- `lower_case_table_names`
|
||||
|
||||
用于控制用户表表名大小写是否敏感。
|
||||
|
||||
@ -54,15 +54,13 @@ key:
|
||||
|
||||
exec_mem_limit: 限制查询的内存使用。详见会话变量 `exec_mem_limit` 的介绍。-1 表示未设置。
|
||||
|
||||
load_mem_limit: 限制导入的内存使用。详见会话变量 `load_mem_limit` 的介绍。-1 表示未设置。
|
||||
|
||||
resource.cpu_share: cpu资源分配。(已废弃)
|
||||
|
||||
load_cluster.{cluster_name}.priority: 为指定的cluster分配优先级,可以为 HIGH 或 NORMAL
|
||||
|
||||
resource_tags:指定用户的资源标签权限。
|
||||
|
||||
注:`cpu_resource_limit`, `exec_mem_limit`, `load_mem_limit` 三个属性如果未设置,则默认使用会话变量中值。
|
||||
注:`cpu_resource_limit`, `exec_mem_limit` 两个属性如果未设置,则默认使用会话变量中值。
|
||||
|
||||
普通用户权限:
|
||||
|
||||
@ -158,12 +156,6 @@ key:
|
||||
SET PROPERTY FOR 'jack' 'exec_mem_limit' = '2147483648';
|
||||
```
|
||||
|
||||
13. 修改用户的导入内存使用限制,单位字节
|
||||
|
||||
```sql
|
||||
SET PROPERTY FOR 'jack' 'load_mem_limit' = '2147483648';
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
SET, PROPERTY
|
||||
|
||||
@ -295,7 +295,7 @@ public class Load {
|
||||
// resource info
|
||||
if (ConnectContext.get() != null) {
|
||||
job.setResourceInfo(ConnectContext.get().toResourceCtx());
|
||||
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getLoadMemLimit());
|
||||
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte());
|
||||
}
|
||||
|
||||
// job properties
|
||||
|
||||
@ -289,8 +289,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
}
|
||||
if (stmt.getExecMemLimit() != -1) {
|
||||
this.execMemLimit = stmt.getExecMemLimit();
|
||||
} else if (ConnectContext.get() != null) {
|
||||
this.execMemLimit = ConnectContext.get().getSessionVariable().getLoadMemLimit();
|
||||
}
|
||||
if (stmt.getSendBatchParallelism() > 0) {
|
||||
this.sendBatchParallelism = stmt.getSendBatchParallelism();
|
||||
|
||||
@ -50,9 +50,6 @@ public class CommonUserProperties implements Writable {
|
||||
// user level exec_mem_limit, if > 0, will overwrite the exec_mem_limit in session variable
|
||||
@SerializedName("execMemLimit")
|
||||
private long execMemLimit = -1;
|
||||
// user level load_mem_limit, if > 0, will overwrite the load_mem_limit in session variable
|
||||
@SerializedName("loadMemLimit")
|
||||
private long loadMemLimit = -1;
|
||||
|
||||
private String[] sqlBlockRulesSplit = {};
|
||||
|
||||
@ -114,14 +111,6 @@ public class CommonUserProperties implements Writable {
|
||||
this.execMemLimit = execMemLimit;
|
||||
}
|
||||
|
||||
public long getLoadMemLimit() {
|
||||
return loadMemLimit;
|
||||
}
|
||||
|
||||
public void setLoadMemLimit(long loadMemLimit) {
|
||||
this.loadMemLimit = loadMemLimit;
|
||||
}
|
||||
|
||||
public static CommonUserProperties read(DataInput in) throws IOException {
|
||||
String json = Text.readString(in);
|
||||
CommonUserProperties commonUserProperties = GsonUtils.GSON.fromJson(json, CommonUserProperties.class);
|
||||
|
||||
@ -1408,15 +1408,6 @@ public class PaloAuth implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public long getLoadMemLimit(String qualifiedUser) {
|
||||
readLock();
|
||||
try {
|
||||
return propertyMgr.getLoadMemLimit(qualifiedUser);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void getAllDomains(Set<String> allDomains) {
|
||||
readLock();
|
||||
try {
|
||||
|
||||
@ -63,7 +63,6 @@ public class UserProperty implements Writable {
|
||||
private static final String PROP_SQL_BLOCK_RULES = "sql_block_rules";
|
||||
private static final String PROP_CPU_RESOURCE_LIMIT = "cpu_resource_limit";
|
||||
private static final String PROP_EXEC_MEM_LIMIT = "exec_mem_limit";
|
||||
private static final String PROP_LOAD_MEM_LIMIT = "load_mem_limit";
|
||||
// advanced properties end
|
||||
|
||||
private static final String PROP_LOAD_CLUSTER = "load_cluster";
|
||||
@ -109,7 +108,6 @@ public class UserProperty implements Writable {
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_CPU_RESOURCE_LIMIT + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE));
|
||||
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE));
|
||||
|
||||
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE));
|
||||
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE));
|
||||
@ -156,10 +154,6 @@ public class UserProperty implements Writable {
|
||||
return commonProperties.getExecMemLimit();
|
||||
}
|
||||
|
||||
public long getLoadMemLimit() {
|
||||
return commonProperties.getLoadMemLimit();
|
||||
}
|
||||
|
||||
public void setPasswordForDomain(String domain, byte[] password, boolean errOnExist) throws DdlException {
|
||||
if (errOnExist && whiteList.containsDomain(domain)) {
|
||||
throw new DdlException("Domain " + domain + " of user " + qualifiedUser + " already exists");
|
||||
@ -182,7 +176,6 @@ public class UserProperty implements Writable {
|
||||
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
|
||||
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
|
||||
long execMemLimit = this.commonProperties.getExecMemLimit();
|
||||
long loadMemLimit = this.commonProperties.getLoadMemLimit();
|
||||
|
||||
UserResource newResource = resource.getCopiedUserResource();
|
||||
String newDefaultLoadCluster = defaultLoadCluster;
|
||||
@ -321,8 +314,6 @@ public class UserProperty implements Writable {
|
||||
} else if (keyArr[0].equalsIgnoreCase(PROP_EXEC_MEM_LIMIT)) {
|
||||
// set property "exec_mem_limit" = "2147483648";
|
||||
execMemLimit = getLongProperty(key, value, keyArr, PROP_EXEC_MEM_LIMIT);
|
||||
} else if (keyArr[0].equalsIgnoreCase(PROP_LOAD_MEM_LIMIT)) {
|
||||
loadMemLimit = getLongProperty(key, value, keyArr, PROP_LOAD_MEM_LIMIT);
|
||||
} else {
|
||||
throw new DdlException("Unknown user property(" + key + ")");
|
||||
}
|
||||
@ -335,7 +326,6 @@ public class UserProperty implements Writable {
|
||||
this.commonProperties.setCpuResourceLimit(cpuResourceLimit);
|
||||
this.commonProperties.setResourceTags(resourceTags);
|
||||
this.commonProperties.setExecMemLimit(execMemLimit);
|
||||
this.commonProperties.setLoadMemLimit(loadMemLimit);
|
||||
resource = newResource;
|
||||
if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
|
||||
defaultLoadCluster = newDefaultLoadCluster;
|
||||
@ -462,9 +452,6 @@ public class UserProperty implements Writable {
|
||||
// exec mem limit
|
||||
result.add(Lists.newArrayList(PROP_EXEC_MEM_LIMIT, String.valueOf(commonProperties.getExecMemLimit())));
|
||||
|
||||
// load mem limit
|
||||
result.add(Lists.newArrayList(PROP_LOAD_MEM_LIMIT, String.valueOf(commonProperties.getLoadMemLimit())));
|
||||
|
||||
// resource tag
|
||||
result.add(Lists.newArrayList(PROP_RESOURCE_TAGS, Joiner.on(", ").join(commonProperties.getResourceTags())));
|
||||
|
||||
|
||||
@ -274,15 +274,6 @@ public class UserPropertyMgr implements Writable {
|
||||
return existProperty.getExecMemLimit();
|
||||
}
|
||||
|
||||
public long getLoadMemLimit(String qualifiedUser) {
|
||||
UserProperty existProperty = propertyMap.get(qualifiedUser);
|
||||
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
|
||||
if (existProperty == null) {
|
||||
return -1;
|
||||
}
|
||||
return existProperty.getLoadMemLimit();
|
||||
}
|
||||
|
||||
private UserProperty getLdapPropertyIfNull(String qualifiedUser, UserProperty existProperty) {
|
||||
if (existProperty == null && Env.getCurrentEnv().getAuth().getLdapManager().doesUserExist(qualifiedUser)) {
|
||||
return LDAP_PROPERTY;
|
||||
|
||||
@ -562,9 +562,6 @@ public class ConnectProcessor {
|
||||
if (request.isSetQueryTimeout()) {
|
||||
ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout());
|
||||
}
|
||||
if (request.isSetLoadMemLimit()) {
|
||||
ctx.getSessionVariable().setLoadMemLimit(request.loadMemLimit);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> traceCarrier = new HashMap<>();
|
||||
|
||||
@ -307,12 +307,6 @@ public class Coordinator {
|
||||
this.queryOptions.setInitialReservationTotalClaims(memLimit);
|
||||
this.queryOptions.setBufferPoolLimit(memLimit);
|
||||
}
|
||||
// set load mem limit
|
||||
memLimit = Env.getCurrentEnv().getAuth().getLoadMemLimit(qualifiedUser);
|
||||
if (memLimit > 0) {
|
||||
// overwrite the load_mem_limit from session variable;
|
||||
this.queryOptions.setLoadMemLimit(memLimit);
|
||||
}
|
||||
}
|
||||
|
||||
private void initQueryOptions(ConnectContext context) {
|
||||
|
||||
@ -100,16 +100,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
// user can set instance num after exchange, no need to be equal to nums of before exchange
|
||||
public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";
|
||||
public static final String SHOW_HIDDEN_COLUMNS = "show_hidden_columns";
|
||||
/*
|
||||
* configure the mem limit of load process on BE.
|
||||
* Previously users used exec_mem_limit to set memory limits.
|
||||
* To maintain compatibility, the default value of load_mem_limit is 0,
|
||||
* which means that the load memory limit is still using exec_mem_limit.
|
||||
* Users can set a value greater than zero to explicitly specify the load memory limit.
|
||||
* This variable is mainly for INSERT operation, because INSERT operation has both query and load part.
|
||||
* Using only the exec_mem_limit variable does not make a good distinction of memory limit between the two parts.
|
||||
*/
|
||||
public static final String LOAD_MEM_LIMIT = "load_mem_limit";
|
||||
public static final String USE_V2_ROLLUP = "use_v2_rollup";
|
||||
public static final String TEST_MATERIALIZED_VIEW = "test_materialized_view";
|
||||
public static final String REWRITE_COUNT_DISTINCT_TO_BITMAP_HLL = "rewrite_count_distinct_to_bitmap_hll";
|
||||
@ -399,9 +389,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
|
||||
public boolean forwardToMaster = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
|
||||
public long loadMemLimit = 2 * 1024 * 1024 * 1024L; // 2GB as default
|
||||
|
||||
@VariableMgr.VarAttr(name = USE_V2_ROLLUP)
|
||||
public boolean useV2Rollup = false;
|
||||
|
||||
@ -603,10 +590,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return maxExecMemByte;
|
||||
}
|
||||
|
||||
public long getLoadMemLimit() {
|
||||
return loadMemLimit;
|
||||
}
|
||||
|
||||
public int getQueryTimeoutS() {
|
||||
return queryTimeoutS;
|
||||
}
|
||||
@ -754,10 +737,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.sqlQuoteShowCreate = sqlQuoteShowCreate;
|
||||
}
|
||||
|
||||
public void setLoadMemLimit(long loadMemLimit) {
|
||||
this.loadMemLimit = loadMemLimit;
|
||||
}
|
||||
|
||||
public void setQueryTimeoutS(int queryTimeoutS) {
|
||||
this.queryTimeoutS = queryTimeoutS;
|
||||
}
|
||||
@ -1196,7 +1175,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
tResult.setBatchSize(batchSize);
|
||||
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
|
||||
tResult.setLoadMemLimit(loadMemLimit);
|
||||
|
||||
if (maxScanKeyNum > -1) {
|
||||
tResult.setMaxScanKeyNum(maxScanKeyNum);
|
||||
@ -1394,9 +1372,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
if (queryOptions.isSetQueryTimeout()) {
|
||||
setQueryTimeoutS(queryOptions.getQueryTimeout());
|
||||
}
|
||||
if (queryOptions.isSetLoadMemLimit()) {
|
||||
setLoadMemLimit(queryOptions.getLoadMemLimit());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1406,7 +1381,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
TQueryOptions queryOptions = new TQueryOptions();
|
||||
queryOptions.setMemLimit(maxExecMemByte);
|
||||
queryOptions.setQueryTimeout(queryTimeoutS);
|
||||
queryOptions.setLoadMemLimit(loadMemLimit);
|
||||
return queryOptions;
|
||||
}
|
||||
|
||||
|
||||
@ -30,7 +30,6 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.loadv2.LoadTask;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TStreamLoadPutRequest;
|
||||
@ -291,8 +290,6 @@ public class StreamLoadTask implements LoadTaskInfo {
|
||||
}
|
||||
if (request.isSetExecMemLimit()) {
|
||||
execMemLimit = request.getExecMemLimit();
|
||||
} else {
|
||||
execMemLimit = VariableMgr.getDefaultSessionVariable().getLoadMemLimit();
|
||||
}
|
||||
if (request.getFormatType() == TFileFormatType.FORMAT_JSON) {
|
||||
if (request.getJsonpaths() != null) {
|
||||
|
||||
@ -278,13 +278,8 @@ public class ResourceTagQueryTest {
|
||||
long execMemLimit = Env.getCurrentEnv().getAuth().getExecMemLimit(PaloAuth.ROOT_USER);
|
||||
Assert.assertEquals(1000000, execMemLimit);
|
||||
|
||||
String setLoadMemLimitStr = "set property for 'root' 'load_mem_limit' = '2000000';";
|
||||
ExceptionChecker.expectThrowsNoException(() -> setProperty(setLoadMemLimitStr));
|
||||
long loadMemLimit = Env.getCurrentEnv().getAuth().getLoadMemLimit(PaloAuth.ROOT_USER);
|
||||
Assert.assertEquals(2000000, loadMemLimit);
|
||||
|
||||
List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(PaloAuth.ROOT_USER);
|
||||
Assert.assertEquals(17, userProps.size());
|
||||
Assert.assertEquals(16, userProps.size());
|
||||
}
|
||||
|
||||
private void checkTableReplicaAllocation(OlapTable tbl) throws InterruptedException {
|
||||
|
||||
@ -60,7 +60,7 @@ public class SessionVariablesTest {
|
||||
public void testForwardQueryOptions() {
|
||||
TQueryOptions queryOptions = sessionVariable.getQueryOptionVariables();
|
||||
Assert.assertTrue(queryOptions.isSetMemLimit());
|
||||
Assert.assertTrue(queryOptions.isSetLoadMemLimit());
|
||||
Assert.assertFalse(queryOptions.isSetLoadMemLimit());
|
||||
Assert.assertTrue(queryOptions.isSetQueryTimeout());
|
||||
|
||||
queryOptions.setQueryTimeout(123);
|
||||
|
||||
Reference in New Issue
Block a user