[enhancement](load) make default load mem limit configurable (#12348)

* make LoadMemLimit valid for broker load, stream load and routine load

Co-authored-by: wuhangze <wuhangze@jd.com>
This commit is contained in:
Henry2SS
2022-09-12 10:25:01 +08:00
committed by GitHub
parent fc605779ed
commit ecfefae715
7 changed files with 14 additions and 8 deletions

View File

@ -443,6 +443,7 @@ void RuntimeState::export_load_error(const std::string& err_msg) {
}
int64_t RuntimeState::get_load_mem_limit() {
// TODO: the code is abandoned, it can be deleted after v1.3
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) {
return _query_options.load_mem_limit;
} else {

View File

@ -271,11 +271,11 @@ Note that the comment must start with /*+ and can only follow the SELECT.
* `load_mem_limit`
Used to specify the memory limit of the load operation. The default is 0, which means that this variable is not used, and `exec_mem_limit` is used as the memory limit for the load operation.
Used to specify the memory limit of the load operation. The default is 2GB.
This variable is usually used for INSERT operations. Because the INSERT operation has both query and load part. If the user does not set this variable, the respective memory limits of the query and load part are `exec_mem_limit`. Otherwise, the memory of query part of INSERT is limited to `exec_mem_limit`, and the load part is limited to` load_mem_limit`.
Broker Load, Stream Load and Routine Load use `load_mem_limit` by default; if the user specifies the task `exec_mem_limit` parameter when creating a load, the specified value is used.
For other load methods, such as BROKER LOAD, STREAM LOAD, the memory limit still uses `exec_mem_limit`.
The INSERT operation has two parts: query and import. The memory limit of the load part of INSERT is `load_mem_limit`, and the query part is limited to `exec_mem_limit`.
* `lower_case_table_names`

View File

@ -267,11 +267,11 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3);
- `load_mem_limit`
用于指定导入操作的内存限制。默认为 0,即表示不使用该变量,而采用 `exec_mem_limit` 作为导入操作的内存限制
用于指定所有导入的内存限制。默认是2GB
这个变量仅用于 INSERT 操作。因为 INSERT 操作设计查询和导入两个部分,如果用户不设置此变量,则查询和导入操作各自的内存限制均为 `exec_mem_limit`。否则,INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`
对于 Broker Load, Stream Load 和 Routine Load,默认使用`load_mem_limit`; 如果用户创建任务时指定任务`exec_mem_limit`参数,则使用指定的值
其他导入方式,如 BROKER LOAD,STREAM LOAD 的内存限制依然使用 `exec_mem_limit`
这个变量也用于 INSERT 操作。 INSERT 操作设计查询和导入两个部分, INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`
- `lower_case_table_names`

View File

@ -296,7 +296,7 @@ public class Load {
// resource info
if (ConnectContext.get() != null) {
job.setResourceInfo(ConnectContext.get().toResourceCtx());
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte());
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getLoadMemLimit());
}
// job properties

View File

@ -289,6 +289,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
if (stmt.getExecMemLimit() != -1) {
this.execMemLimit = stmt.getExecMemLimit();
} else if (ConnectContext.get() != null) {
this.execMemLimit = ConnectContext.get().getSessionVariable().getLoadMemLimit();
}
if (stmt.getSendBatchParallelism() > 0) {
this.sendBatchParallelism = stmt.getSendBatchParallelism();

View File

@ -382,7 +382,7 @@ public class SessionVariable implements Serializable, Writable {
public boolean forwardToMaster = true;
@VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
public long loadMemLimit = 0L;
public long loadMemLimit = 2 * 1024 * 1024 * 1024L; // 2GB as default
@VariableMgr.VarAttr(name = USE_V2_ROLLUP)
public boolean useV2Rollup = false;

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
@ -290,6 +291,8 @@ public class StreamLoadTask implements LoadTaskInfo {
}
if (request.isSetExecMemLimit()) {
execMemLimit = request.getExecMemLimit();
} else {
execMemLimit = VariableMgr.getDefaultSessionVariable().getLoadMemLimit();
}
if (request.getFormatType() == TFileFormatType.FORMAT_JSON) {
if (request.getJsonpaths() != null) {