diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index f2ff0d1393..9e33c70c41 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -443,6 +443,7 @@ void RuntimeState::export_load_error(const std::string& err_msg) { } int64_t RuntimeState::get_load_mem_limit() { + // TODO: the code is abandoned, it can be deleted after v1.3 if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) { return _query_options.load_mem_limit; } else { diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 016ab933c1..5b388c8a05 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -271,11 +271,11 @@ Note that the comment must start with /*+ and can only follow the SELECT. * `load_mem_limit` - Used to specify the memory limit of the load operation. The default is 0, which means that this variable is not used, and `exec_mem_limit` is used as the memory limit for the load operation. + Used to specify the memory limit of the load operation. The default is 2GB. - This variable is usually used for INSERT operations. Because the INSERT operation has both query and load part. If the user does not set this variable, the respective memory limits of the query and load part are `exec_mem_limit`. Otherwise, the memory of query part of INSERT is limited to `exec_mem_limit`, and the load part is limited to` load_mem_limit`. + Broker Load, Stream Load and Routine Load use `load_mem_limit` by default; if the user specifies the task `exec_mem_limit` parameter when creating a load, the specified value is used. - For other load methods, such as BROKER LOAD, STREAM LOAD, the memory limit still uses `exec_mem_limit`. + The INSERT operation has two parts: query and import. The memory limit of the load part of INSERT is `load_mem_limit`, and the query part is limited to `exec_mem_limit`. * `lower_case_table_names` diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 3b938c42a2..c548f452f9 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -267,11 +267,11 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); - `load_mem_limit` - 用于指定导入操作的内存限制。默认为 0,即表示不使用该变量,而采用 `exec_mem_limit` 作为导入操作的内存限制。 + 用于指定所有导入的内存限制。默认是2GB。 - 这个变量仅用于 INSERT 操作。因为 INSERT 操作设计查询和导入两个部分,如果用户不设置此变量,则查询和导入操作各自的内存限制均为 `exec_mem_limit`。否则,INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`。 + 对于 Broker Load, Stream Load 和 Routine Load,默认使用`load_mem_limit`; 如果用户创建任务时指定任务`exec_mem_limit`参数,则使用指定的值。 - 其他导入方式,如 BROKER LOAD,STREAM LOAD 的内存限制依然使用 `exec_mem_limit`。 + 这个变量也用于 INSERT 操作。 INSERT 操作设计查询和导入两个部分, INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`。 - `lower_case_table_names` diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 4a7732fd26..4d11eeb493 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -296,7 +296,7 @@ public class Load { // resource info if (ConnectContext.get() != null) { job.setResourceInfo(ConnectContext.get().toResourceCtx()); - job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte()); + job.setExecMemLimit(ConnectContext.get().getSessionVariable().getLoadMemLimit()); } // job properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 78a0a58015..3acded2647 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -289,6 +289,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } if (stmt.getExecMemLimit() != -1) { this.execMemLimit = stmt.getExecMemLimit(); + } else if (ConnectContext.get() != null) { + this.execMemLimit = ConnectContext.get().getSessionVariable().getLoadMemLimit(); } if (stmt.getSendBatchParallelism() > 0) { this.sendBatchParallelism = stmt.getSendBatchParallelism(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 026920ccb5..da79ae0410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -382,7 +382,7 @@ public class SessionVariable implements Serializable, Writable { public boolean forwardToMaster = true; @VariableMgr.VarAttr(name = LOAD_MEM_LIMIT) - public long loadMemLimit = 0L; + public long loadMemLimit = 2 * 1024 * 1024 * 1024L; // 2GB as default @VariableMgr.VarAttr(name = USE_V2_ROLLUP) public boolean useV2Rollup = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 569776f291..e362213047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -30,6 +30,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.qe.VariableMgr; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -290,6 +291,8 @@ public class StreamLoadTask implements LoadTaskInfo { } if (request.isSetExecMemLimit()) { execMemLimit = request.getExecMemLimit(); + } else { + execMemLimit = VariableMgr.getDefaultSessionVariable().getLoadMemLimit(); } if (request.getFormatType() == TFileFormatType.FORMAT_JSON) { if (request.getJsonpaths() != null) {