diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 410844c62f..cb203f0689 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -513,8 +513,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { _close_timer = ADD_TIMER(_profile, "CloseTime"); _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); - // use query mem limit as load mem limit for remote load channels - _load_mem_limit = state->query_mem_tracker()->limit(); + _load_mem_limit = state->get_load_mem_limit(); // open all channels auto& partitions = _partition->get_partitions(); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 0717b84c48..e5188b1489 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -531,5 +531,14 @@ Status RuntimeState::get_codegen(LlvmCodeGen** codegen) { Status RuntimeState::StartSpilling(MemTracker* mem_tracker) { return Status::InternalError("Mem limit exceeded."); } + +int64_t RuntimeState::get_load_mem_limit() { + if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) { + return _query_options.load_mem_limit; + } else { + return _query_mem_tracker->limit(); + } +} + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 41c399d46e..9589035745 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -501,6 +501,10 @@ public: /// Helper to call QueryState::StartSpilling(). Status StartSpilling(MemTracker* mem_tracker); + // get mem limit for load channel + // if load mem limit is not set, or is zero, using query mem limit instead. + int64_t get_load_mem_limit(); + private: // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md index 7bdb821b09..5616f478e1 100644 --- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md @@ -215,7 +215,7 @@ Label 的另一个作用,是防止用户重复导入相同的数据。**强烈 + strict\_mode - Broker load 导入可以开启 strict mode 模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode 为开启。 + Broker load 导入可以开启 strict mode 模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode 为关闭。 strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index 1f69874c14..478a8ab60f 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -150,7 +150,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 * strict\_mode - Routine load 导入可以开启 strict mode 模式。开启方式为在 job\_properties 中增加 ```"strict_mode" = "true"``` 。默认的 strict mode 为开启。 + Routine load 导入可以开启 strict mode 模式。开启方式为在 job\_properties 中增加 ```"strict_mode" = "true"``` 。默认的 strict mode 为关闭。 strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: diff --git a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md index 91a4df57de..5ab9188243 100644 --- a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md @@ -139,7 +139,7 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的 + strict\_mode - Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为开启。 + Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为关闭。 strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: diff --git a/docs/documentation/cn/administrator-guide/session-variables.md b/docs/documentation/cn/administrator-guide/variables.md similarity index 91% rename from docs/documentation/cn/administrator-guide/session-variables.md rename to docs/documentation/cn/administrator-guide/variables.md index 73a1d70a95..5b25a53871 100644 --- a/docs/documentation/cn/administrator-guide/session-variables.md +++ b/docs/documentation/cn/administrator-guide/variables.md @@ -52,14 +52,8 @@ SET time_zone = "Asia/Shanghai"; SET GLOBAL exec_mem_limit = 137438953472 ``` -同时,变量设置也支持常量表达式。如: - -``` -SET exec_mem_limit = 10 * 1024 * 1024 * 1024; -SET forward_to_master = concat('tr', 'u', 'e'); -``` - > 注1:只有 ADMIN 用户可以设置变量的全局生效。 +> 注2:全局生效的变量不影响当前会话的变量值,仅影响新的会话中的变量。 支持全局生效的变量包括: @@ -71,7 +65,14 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `exec_mem_limit` * `batch_size` * `parallel_fragment_exec_instance_num` -* `parallel_fragment_exec_instance_num` +* `parallel_exchange_instance_num` + +同时,变量设置也支持常量表达式。如: + +``` +SET exec_mem_limit = 10 * 1024 * 1024 * 1024; +SET forward_to_master = concat('tr', 'u', 'e'); +``` ## 支持的变量 @@ -209,6 +210,14 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `license` 显示 Doris 的 License。无其他作用。 + +* `load_mem_limit` + + 用于指定导入操作的内存限制。默认为 0,即表示不使用该变量,而采用 `exec_mem_limit` 作为导入操作的内存限制。 + + 这个变量仅用于 INSERT 操作。因为 INSERT 操作设计查询和导入两个部分,如果用户不设置此变量,则查询和导入操作各自的内存限制均为 `exec_mem_limit`。否则,INSERT 的查询部分内存限制为 `exec_mem_limit`,而导入部分限制为 `load_mem_limit`。 + + 其他导入方式,如 BROKER LOAD,STREAM LOAD 的内存限制依然使用 `exec_mem_limit`。 * `lower_case_table_names` @@ -236,7 +245,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); 在一个分布式的查询执行计划中,上层节点通常有一个或多个 exchange node 用于接收来自下层节点在不同 BE 上的执行实例的数据。通常 exchange node 数量等于下层节点执行实例数量。 - 如果用户需要减少上层节点的 exchange node 数量,可以设置该值。 + 在一些聚合查询场景下,如果底层需要扫描的数据量较大,但聚合之后的数据量很小,则可以尝试修改此变量为一个较小的值,可以降低此类查询的资源开销。如在 DUPLICATE KEY 明细模型上进行聚合查询的场景。 * `parallel_fragment_exec_instance_num` diff --git a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md index 134682e0ef..8c52029a7f 100644 --- a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md +++ b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md @@ -211,7 +211,7 @@ The following is a detailed explanation of some parameters of the import operati + strict\_mode - Broker load 导入可以开启 strict mode 模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode 为开启。 + Broker load 导入可以开启 strict mode 模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode 为关闭。 The strict mode means that the column type conversion in the import process is strictly filtered. The strategy of strict filtering is as follows: diff --git a/docs/documentation/en/administrator-guide/load-data/routine-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/routine-load-manual_EN.md index 9079330af6..44f1dfdc58 100644 --- a/docs/documentation/en/administrator-guide/load-data/routine-load-manual_EN.md +++ b/docs/documentation/en/administrator-guide/load-data/routine-load-manual_EN.md @@ -150,7 +150,7 @@ The detailed syntax for creating a routine load task can be connected to Doris a * strict\_mode - Routine load load can turn on strict mode mode. The way to open it is to add ```"strict_mode" = "true"``` to job\_properties. The default strict mode is on. + Routine load load can turn on strict mode mode. The way to open it is to add ```"strict_mode" = "true"``` to job\_properties. The default strict mode is off. The strict mode mode means strict filtering of column type conversions during the load process. The strict filtering strategy is as follows: diff --git a/docs/documentation/en/administrator-guide/session-variables_EN.md b/docs/documentation/en/administrator-guide/variables_EN.md similarity index 91% rename from docs/documentation/en/administrator-guide/session-variables_EN.md rename to docs/documentation/en/administrator-guide/variables_EN.md index 46f754f4d2..12dddfaf5a 100644 --- a/docs/documentation/en/administrator-guide/session-variables_EN.md +++ b/docs/documentation/en/administrator-guide/variables_EN.md @@ -52,14 +52,8 @@ For global-level, set by `SET GLOBALE var_name=xxx;`. Such as: SET GLOBAL exec_mem_limit = 137438953472 ``` -At the same time, variable settings also support constant expressions. Such as: - -``` -SET exec_mem_limit = 10 * 1024 * 1024 * 1024; -SET forward_to_master = concat('tr', 'u', 'e'); -``` - > Note 1: Only ADMIN users can set variable at global-level. +> Note 2: Global-level variables do not affect variable values in the current session, only variables in new sessions. Variables that support global-level setting include: @@ -71,7 +65,14 @@ Variables that support global-level setting include: * `exec_mem_limit` * `batch_size` * `parallel_fragment_exec_instance_num` -* `parallel_fragment_exec_instance_num` +* `parallel_exchange_instance_num` + +At the same time, variable settings also support constant expressions. Such as: + +``` +SET exec_mem_limit = 10 * 1024 * 1024 * 1024; +SET forward_to_master = concat('tr', 'u', 'e'); +``` ## Supported variables @@ -210,6 +211,14 @@ Variables that support global-level setting include: * `license` Show Doris's license. No other effect. + +* `load_mem_limit` + + Used to specify the memory limit of the load operation. The default is 0, which means that this variable is not used, and `exec_mem_limit` is used as the memory limit for the load operation. + + This variable is usually used for INSERT operations. Because the INSERT operation has both query and load part. If the user does not set this variable, the respective memory limits of the query and load part are `exec_mem_limit`. Otherwise, the memory of query part of INSERT is limited to `exec_mem_limit`, and the load part is limited to` load_mem_limit`. + + For other load methods, such as BROKER LOAD, STREAM LOAD, the memory limit still uses `exec_mem_limit`. * `lower_case_table_names` @@ -237,7 +246,7 @@ Variables that support global-level setting include: In a distributed query execution plan, the upper node usually has one or more exchange nodes for receiving data from the execution instances of the lower nodes on different BEs. Usually the number of exchange nodes is equal to the number of execution instances of the lower nodes. - This value can be set if the user needs to reduce the number of exchange nodes of the upper node. + In some aggregate query scenarios, if the amount of data to be scanned at the bottom is large, but the amount of data after aggregation is small, you can try to modify this variable to a smaller value, which can reduce the resource overhead of such queries. Such as the scenario of aggregation query on the DUPLICATE KEY data model. * `parallel_fragment_exec_instance_num` diff --git a/fe/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java b/fe/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java index 0602869a71..188a94c348 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java @@ -19,37 +19,19 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.ParseUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; public class AlterDatabaseQuotaStmt extends DdlStmt { private String dbName; private String quotaQuantity; private long quota; - private static ImmutableMap validUnitMultiplier = - ImmutableMap.builder().put("B", 1L) - .put("K", 1024L) - .put("KB", 1024L) - .put("M", 1024L * 1024) - .put("MB", 1024L * 1024) - .put("G", 1024L * 1024 * 1024) - .put("GB", 1024L * 1024 * 1024) - .put("T", 1024L * 1024 * 1024 * 1024) - .put("TB", 1024L * 1024 * 1024 * 1024) - .put("P", 1024L * 1024 * 1024 * 1024 * 1024) - .put("PB", 1024L * 1024 * 1024 * 1024 * 1024).build(); - - private String quotaPattern = "(\\d+)(\\D*)"; public AlterDatabaseQuotaStmt(String dbName, String quotaQuantity) { this.dbName = dbName; @@ -64,34 +46,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { return quota; } - private void analyzeQuotaQuantity() throws UserException { - Pattern r = Pattern.compile(quotaPattern); - Matcher m = r.matcher(quotaQuantity); - if (m.matches()) { - try { - quota = Long.parseLong(m.group(1)); - } catch(NumberFormatException nfe) { - throw new AnalysisException("invalid quota:" + m.group(1)); - } - if (quota < 0L) { - throw new AnalysisException("Quota must larger than 0"); - } - - String unit = "B"; - String tmpUnit = m.group(2); - if (!Strings.isNullOrEmpty(tmpUnit)) { - unit = tmpUnit.toUpperCase(); - } - if (validUnitMultiplier.containsKey(unit)) { - quota = quota * validUnitMultiplier.get(unit); - } else { - throw new AnalysisException("invalid unit:" + tmpUnit); - } - } else { - throw new AnalysisException("invalid quota expression:" + quotaQuantity); - } - } - + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -104,7 +59,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); } dbName = ClusterNamespace.getFullName(getClusterName(), dbName); - analyzeQuotaQuantity(); + quota = ParseUtil.analyzeDataVolumn(quotaQuantity); } @Override diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 0a55ce8106..2bb1020835 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -38,5 +38,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_66; + public static int meta_version = FeMetaVersion.VERSION_67; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 6c829845fa..0ea579d032 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -142,4 +142,6 @@ public final class FeMetaVersion { public static final int VERSION_65 = 65; // routine load/stream load persist session variables public static final int VERSION_66 = 66; + // load_mem_limit session variable + public static final int VERSION_67 = 67; } diff --git a/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java new file mode 100644 index 0000000000..613cb9581d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ParseUtil { + private static ImmutableMap validDataVolumnUnitMultiplier = + ImmutableMap.builder().put("B", 1L) + .put("K", 1024L) + .put("KB", 1024L) + .put("M", 1024L * 1024) + .put("MB", 1024L * 1024) + .put("G", 1024L * 1024 * 1024) + .put("GB", 1024L * 1024 * 1024) + .put("T", 1024L * 1024 * 1024 * 1024) + .put("TB", 1024L * 1024 * 1024 * 1024) + .put("P", 1024L * 1024 * 1024 * 1024 * 1024) + .put("PB", 1024L * 1024 * 1024 * 1024 * 1024).build(); + + private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); + + public static long analyzeDataVolumn(String dataVolumnStr) throws UserException { + long dataVolumn = 0; + Matcher m = dataVolumnPattern.matcher(dataVolumnStr); + if (m.matches()) { + try { + dataVolumn = Long.parseLong(m.group(1)); + } catch (NumberFormatException nfe) { + throw new AnalysisException("invalid data volumn:" + m.group(1)); + } + if (dataVolumn < 0L) { + throw new AnalysisException("Data volumn must larger than 0"); + } + + String unit = "B"; + String tmpUnit = m.group(2); + if (!Strings.isNullOrEmpty(tmpUnit)) { + unit = tmpUnit.toUpperCase(); + } + if (validDataVolumnUnitMultiplier.containsKey(unit)) { + dataVolumn = dataVolumn * validDataVolumnUnitMultiplier.get(unit); + } else { + throw new AnalysisException("invalid unit:" + tmpUnit); + } + } else { + throw new AnalysisException("invalid data volumn expression:" + dataVolumnStr); + } + return dataVolumn; + } + +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 93d25a1c07..fa42fcf332 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -95,7 +95,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected long timeoutSecond = Config.broker_load_default_timeout_second; protected long execMemLimit = 2147483648L; // 2GB; protected double maxFilterRatio = 0; - protected boolean strictMode = true; + protected boolean strictMode = false; // default is false protected String timezone = TimeUtils.DEFAULT_TIME_ZONE; @Deprecated protected boolean deleteFlag = false; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 3ee8a43756..64d09f9a72 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -103,6 +103,14 @@ public class LoadLoadingTask extends LoadTask { planner.getFragments(), planner.getScanNodes(), db.getClusterName(), planner.getTimezone()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); + /* + * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. + * And the variable 'load_mem_limit' does not make any effect. + * However, in order to ensure the consistency of semantics when executing on the BE side, + * and to prevent subsequent modification from incorrectly setting the load_mem_limit, + * here we use exec_mem_limit to directly override the load_mem_limit property. + */ + curCoordinator.setLoadMemLimit(execMemLimit); curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); try { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 4d1518408f..95aaf1abfd 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -97,7 +97,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; public static final long DEFAULT_MAX_BATCH_ROWS = 200000; public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB - public static final boolean DEFAULT_STRICT_MODE = true; + public static final boolean DEFAULT_STRICT_MODE = false; // default is false protected static final String STAR_STRING = "*"; /* diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 8a29838258..942e4f8e05 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -1028,7 +1028,6 @@ public class EditLog { logEdit(OperationType.OP_GLOBAL_VARIABLE, variable); } - public void logCreateCluster(Cluster cluster) { logEdit(OperationType.OP_CREATE_CLUSTER, cluster); } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 8d26857bf8..b2491f9b82 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -145,6 +145,8 @@ public class StreamLoadPlanner { queryOptions.setQuery_type(TQueryType.LOAD); queryOptions.setQuery_timeout(streamLoadTask.getTimeout()); queryOptions.setMem_limit(streamLoadTask.getMemLimit()); + // for stream load, we use exec_mem_limit to limit the memory usage of load channel. + queryOptions.setLoad_mem_limit(streamLoadTask.getMemLimit()); params.setQuery_options(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index c2ea9724eb..500c1de59d 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -345,6 +345,12 @@ public class ConnectProcessor { if (request.isSetSqlMode()) { ctx.getSessionVariable().setSqlMode(request.sqlMode); } + if (request.isSetLoadMemLimit()) { + ctx.getSessionVariable().setLoadMemLimit(request.loadMemLimit); + } + if (request.isSetEnableStrictMode()) { + ctx.getSessionVariable().setEnableInsertStrict(request.enableStrictMode); + } ctx.setThreadLocalInfo(); diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 5cf4ea4485..5e273b6461 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -190,8 +190,6 @@ public class Coordinator { // parallel execute private final TUniqueId nextInstanceId; - private boolean isQueryCoordinator; - // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); @@ -278,6 +276,10 @@ public class Coordinator { this.queryOptions.setMem_limit(execMemoryLimit); } + public void setLoadMemLimit(long loadMemLimit) { + this.queryOptions.setLoad_mem_limit(loadMemLimit); + } + public void setTimeout(int timeout) { this.queryOptions.setQuery_timeout(timeout); } diff --git a/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 7abbb5c071..59912e3dd7 100644 --- a/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -83,6 +83,8 @@ public class MasterOpExecutor { params.setUser_ip(ctx.getRemoteIP()); params.setTime_zone(ctx.getSessionVariable().getTimeZone()); params.setStmt_id(ctx.getStmtId()); + params.setLoadMemLimit(ctx.getSessionVariable().getLoadMemLimit()); + params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict()); LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 4b0833dd2a..c719e86aac 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -21,15 +21,18 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TQueryOptions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.JSONObject; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; // System variable public class SessionVariable implements Serializable, Writable { @@ -75,6 +78,16 @@ public class SessionVariable implements Serializable, Writable { public static final String FORWARD_TO_MASTER = "forward_to_master"; // user can set instance num after exchange, no need to be equal to nums of before exchange public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num"; + /* + * configure the mem limit of load process on BE. + * Previously users used exec_mem_limit to set memory limits. + * To maintain compatibility, the default value of load_mem_limit is 0, + * which means that the load memory limit is still using exec_mem_limit. + * Users can set a value greater than zero to explicitly specify the load memory limit. + * This variable is mainly for INSERT operation, because INSERT operation has both query and load part. + * Using only the exec_mem_limit variable does not make a good distinction of memory limit between the two parts. + */ + public static final String LOAD_MEM_LIMIT = "load_mem_limit"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -197,10 +210,17 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FORWARD_TO_MASTER) private boolean forwardToMaster = false; + @VariableMgr.VarAttr(name = LOAD_MEM_LIMIT) + private long loadMemLimit = 0L; + public long getMaxExecMemByte() { return maxExecMemByte; } + public long getLoadMemLimit() { + return loadMemLimit; + } + public int getQueryTimeoutS() { return queryTimeoutS; } @@ -225,142 +245,74 @@ public class SessionVariable implements Serializable, Writable { return autoCommit; } - public void setAutoCommit(boolean autoCommit) { - this.autoCommit = autoCommit; - } - public String getTxIsolation() { return txIsolation; } - public void setTxIsolation(String txIsolation) { - this.txIsolation = txIsolation; - } - public String getCharsetClient() { return charsetClient; } - public void setCharsetClient(String charsetClient) { - this.charsetClient = charsetClient; - } - public String getCharsetConnection() { return charsetConnection; } - public void setCharsetConnection(String charsetConnection) { - this.charsetConnection = charsetConnection; - } - public String getCharsetResults() { return charsetResults; } - public void setCharsetResults(String charsetResults) { - this.charsetResults = charsetResults; - } - public String getCharsetServer() { return charsetServer; } - public void setCharsetServer(String charsetServer) { - this.charsetServer = charsetServer; - } - public String getCollationConnection() { return collationConnection; } - public void setCollationConnection(String collationConnection) { - this.collationConnection = collationConnection; - } - public String getCollationDatabase() { return collationDatabase; } - public void setCollationDatabase(String collationDatabase) { - this.collationDatabase = collationDatabase; - } - public String getCollationServer() { return collationServer; } - public void setCollationServer(String collationServer) { - this.collationServer = collationServer; - } - public boolean isSqlAutoIsNull() { return sqlAutoIsNull; } - public void setSqlAutoIsNull(boolean sqlAutoIsNull) { - this.sqlAutoIsNull = sqlAutoIsNull; - } - public long getSqlSelectLimit() { return sqlSelectLimit; } - public void setSqlSelectLimit(long sqlSelectLimit) { - this.sqlSelectLimit = sqlSelectLimit; - } - public int getMaxAllowedPacket() { return maxAllowedPacket; } - public void setMaxAllowedPacket(int maxAllowedPacket) { - this.maxAllowedPacket = maxAllowedPacket; - } - public int getAutoIncrementIncrement() { return autoIncrementIncrement; } - public void setAutoIncrementIncrement(int autoIncrementIncrement) { - this.autoIncrementIncrement = autoIncrementIncrement; - } - public int getQueryCacheType() { return queryCacheType; } - public void setQueryCacheType(int queryCacheType) { - this.queryCacheType = queryCacheType; - } - public int getInteractiveTimeout() { return interactiveTimeout; } - public void setInteractiveTimeout(int interactiveTimeout) { - this.interactiveTimeout = interactiveTimeout; - } - - public void setWaitTimeout(int waitTimeout) { - this.waitTimeout = waitTimeout; + public int getWaitTimeout() { + return waitTimeout; } public int getNetWriteTimeout() { return netWriteTimeout; } - public void setNetWriteTimeout(int netWriteTimeout) { - this.netWriteTimeout = netWriteTimeout; - } - public int getNetReadTimeout() { return netReadTimeout; } - public void setNetReadTimeout(int netReadTimeout) { - this.netReadTimeout = netReadTimeout; - } - public String getTimeZone() { return timeZone; } @@ -373,26 +325,14 @@ public class SessionVariable implements Serializable, Writable { return sqlSafeUpdates; } - public void setSqlSafeUpdates(int sqlSafeUpdates) { - this.sqlSafeUpdates = sqlSafeUpdates; - } - public int getNetBufferLength() { return netBufferLength; } - public void setNetBufferLength(int netBufferLength) { - this.netBufferLength = netBufferLength; - } - public int getCodegenLevel() { return codegenLevel; } - public void setCodegenLevel(int codegenLevel) { - this.codegenLevel = codegenLevel; - } - public void setMaxExecMemByte(long maxExecMemByte) { if (maxExecMemByte < MIN_EXEC_MEM_LIMIT) { this.maxExecMemByte = MIN_EXEC_MEM_LIMIT; @@ -401,12 +341,12 @@ public class SessionVariable implements Serializable, Writable { } } - public void setQueryTimeoutS(int queryTimeoutS) { - this.queryTimeoutS = queryTimeoutS; + public void setLoadMemLimit(long loadMemLimit) { + this.loadMemLimit = loadMemLimit; } - public void setReportSucc(boolean isReportSucc) { - this.isReportSucc = isReportSucc; + public void setQueryTimeoutS(int queryTimeoutS) { + this.queryTimeoutS = queryTimeoutS; } public String getResourceGroup() { @@ -421,10 +361,6 @@ public class SessionVariable implements Serializable, Writable { return disableColocateJoin; } - public void setDisableColocateJoin(boolean disableColocateJoin) { - this.disableColocateJoin = disableColocateJoin; - } - public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } @@ -433,29 +369,16 @@ public class SessionVariable implements Serializable, Writable { return exchangeInstanceParallel; } - public void setParallelExecInstanceNum(int parallelExecInstanceNum) { - if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) { - this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM; - } else if (parallelExecInstanceNum > MAX_EXEC_INSTANCE_NUM) { - this.parallelExecInstanceNum = MAX_EXEC_INSTANCE_NUM; - } else { - this.parallelExecInstanceNum = parallelExecInstanceNum; - } + public boolean getEnableInsertStrict() { return enableInsertStrict; } + + public void setEnableInsertStrict(boolean enableInsertStrict) { + this.enableInsertStrict = enableInsertStrict; } - public boolean getEnableInsertStrict() { return enableInsertStrict; } - public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; } - - - // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; } - public void setForwardToMaster(boolean forwardToMaster) { - this.forwardToMaster = forwardToMaster; - } - // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { @@ -474,88 +397,140 @@ public class SessionVariable implements Serializable, Writable { tResult.setBatch_size(batchSize); tResult.setDisable_stream_preaggregations(disableStreamPreaggregations); + tResult.setLoad_mem_limit(loadMemLimit); return tResult; } @Override public void write(DataOutput out) throws IOException { - out.writeInt(codegenLevel); - out.writeInt(netBufferLength); - out.writeInt(sqlSafeUpdates); - Text.writeString(out, timeZone); - out.writeInt(netReadTimeout); - out.writeInt(netWriteTimeout); - out.writeInt(waitTimeout); - out.writeInt(interactiveTimeout); - out.writeInt(queryCacheType); - out.writeInt(autoIncrementIncrement); - out.writeInt(maxAllowedPacket); - out.writeLong(sqlSelectLimit); - out.writeBoolean(sqlAutoIsNull); - Text.writeString(out, collationDatabase); - Text.writeString(out, collationConnection); - Text.writeString(out, charsetServer); - Text.writeString(out, charsetResults); - Text.writeString(out, charsetConnection); - Text.writeString(out, charsetClient); - Text.writeString(out, txIsolation); - out.writeBoolean(autoCommit); - Text.writeString(out, resourceGroup); - out.writeLong(sqlMode); - out.writeBoolean(isReportSucc); - out.writeInt(queryTimeoutS); - out.writeLong(maxExecMemByte); - Text.writeString(out, collationServer); - out.writeInt(batchSize); - out.writeBoolean(disableStreamPreaggregations); - out.writeInt(parallelExecInstanceNum); - out.writeInt(exchangeInstanceParallel); + JSONObject root = new JSONObject(); + try { + for (Field field : SessionVariable.class.getDeclaredFields()) { + VarAttr attr = field.getAnnotation(VarAttr.class); + if (attr == null) { + continue; + } + switch (field.getType().getSimpleName()) { + case "boolean": + root.put(attr.name(), (Boolean) field.get(this)); + break; + case "int": + root.put(attr.name(), (Integer) field.get(this)); + break; + case "long": + root.put(attr.name(), (Long) field.get(this)); + break; + case "float": + root.put(attr.name(), (Float) field.get(this)); + break; + case "double": + root.put(attr.name(), (Double) field.get(this)); + break; + case "String": + root.put(attr.name(), (String) field.get(this)); + break; + default: + // Unsupported type variable. + throw new IOException("invalid type: " + field.getType().getSimpleName()); + } + } + } catch (Exception e) { + throw new IOException("failed to write session variable: " + e.getMessage()); + } + Text.writeString(out, root.toString()); } @Override public void readFields(DataInput in) throws IOException { - codegenLevel = in.readInt(); - netBufferLength = in.readInt(); - sqlSafeUpdates = in.readInt(); - timeZone = Text.readString(in); - netReadTimeout = in.readInt(); - netWriteTimeout = in.readInt(); - waitTimeout = in.readInt(); - interactiveTimeout = in.readInt(); - queryCacheType = in.readInt(); - autoIncrementIncrement = in.readInt(); - maxAllowedPacket = in.readInt(); - sqlSelectLimit = in.readLong(); - sqlAutoIsNull = in.readBoolean(); - collationDatabase = Text.readString(in); - collationConnection = Text.readString(in); - charsetServer = Text.readString(in); - charsetResults = Text.readString(in); - charsetConnection = Text.readString(in); - charsetClient = Text.readString(in); - txIsolation = Text.readString(in); - autoCommit = in.readBoolean(); - resourceGroup = Text.readString(in); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) { - sqlMode = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_67) { + codegenLevel = in.readInt(); + netBufferLength = in.readInt(); + sqlSafeUpdates = in.readInt(); + timeZone = Text.readString(in); + netReadTimeout = in.readInt(); + netWriteTimeout = in.readInt(); + waitTimeout = in.readInt(); + interactiveTimeout = in.readInt(); + queryCacheType = in.readInt(); + autoIncrementIncrement = in.readInt(); + maxAllowedPacket = in.readInt(); + sqlSelectLimit = in.readLong(); + sqlAutoIsNull = in.readBoolean(); + collationDatabase = Text.readString(in); + collationConnection = Text.readString(in); + charsetServer = Text.readString(in); + charsetResults = Text.readString(in); + charsetConnection = Text.readString(in); + charsetClient = Text.readString(in); + txIsolation = Text.readString(in); + autoCommit = in.readBoolean(); + resourceGroup = Text.readString(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_65) { + sqlMode = in.readLong(); + } else { + // read old version SQL mode + Text.readString(in); + sqlMode = 0L; + } + isReportSucc = in.readBoolean(); + queryTimeoutS = in.readInt(); + maxExecMemByte = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) { + collationServer = Text.readString(in); + } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) { + batchSize = in.readInt(); + disableStreamPreaggregations = in.readBoolean(); + parallelExecInstanceNum = in.readInt(); + } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { + exchangeInstanceParallel = in.readInt(); + } } else { - // read old version SQL mode - Text.readString(in); - sqlMode = 0L; + readFromJson(in); } - isReportSucc = in.readBoolean(); - queryTimeoutS = in.readInt(); - maxExecMemByte = in.readLong(); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_37) { - collationServer = Text.readString(in); - } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) { - batchSize = in.readInt(); - disableStreamPreaggregations = in.readBoolean(); - parallelExecInstanceNum = in.readInt(); - } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { - exchangeInstanceParallel = in.readInt(); + } + + private void readFromJson(DataInput in) throws IOException { + String json = Text.readString(in); + JSONObject root = new JSONObject(json); + try { + for (Field field : SessionVariable.class.getDeclaredFields()) { + VarAttr attr = field.getAnnotation(VarAttr.class); + if (attr == null) { + continue; + } + + if (!root.has(attr.name())) { + continue; + } + + switch (field.getType().getSimpleName()) { + case "boolean": + field.set(this, root.getBoolean(attr.name())); + break; + case "int": + field.set(this, root.getInt(attr.name())); + break; + case "long": + field.set(this, root.getLong(attr.name())); + break; + case "float": + field.set(this, root.getFloat(attr.name())); + break; + case "double": + field.set(this, root.getDouble(attr.name())); + break; + case "String": + field.set(this, root.getString(attr.name())); + break; + default: + // Unsupported type variable. + throw new IOException("invalid type: " + field.getType().getSimpleName()); + } + } + } catch (Exception e) { + throw new IOException("failed to read session variable: " + e.getMessage()); } } } diff --git a/fe/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/src/main/java/org/apache/doris/qe/VariableMgr.java index 81ba0efa85..80057bd067 100644 --- a/fe/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -204,7 +204,6 @@ public class VariableMgr { } } - // Get from show name to field public static void setVar(SessionVariable sessionVariable, SetVar setVar) throws DdlException { VarContext ctx = ctxByVarName.get(setVar.getVariable()); @@ -239,13 +238,14 @@ public class VariableMgr { wlock.unlock(); } writeGlobalVariableUpdate(globalSessionVariable, "update global variables"); + } else { + // set global variable should not affect variables of current session. + // global variable will only make effect when connecting in. + setValue(sessionVariable, ctx.getField(), value); } - - // whether it is session or global, set variables in current session, to make it effective. - setValue(sessionVariable, ctx.getField(), value); } - // global variable persisitence + // global variable persistence public static void write(DataOutputStream out) throws IOException { globalSessionVariable.write(out); } diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index faa49873d5..02b0fd9ae9 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -59,7 +59,7 @@ public class StreamLoadTask { private String partitions; private String path; private boolean negative; - private boolean strictMode = true; + private boolean strictMode = false; // default is false private String timezone = TimeUtils.DEFAULT_TIME_ZONE; private int timeout = Config.stream_load_default_timeout_second; private long execMemLimit = 2 * 1024 * 1024 * 1024L; // default is 2GB diff --git a/fe/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/src/test/java/org/apache/doris/qe/VariableMgrTest.java index 9ac223db24..2ee5ee865b 100644 --- a/fe/src/test/java/org/apache/doris/qe/VariableMgrTest.java +++ b/fe/src/test/java/org/apache/doris/qe/VariableMgrTest.java @@ -90,19 +90,19 @@ public class VariableMgrTest { // Set global variable SetVar setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(1234L)); VariableMgr.setVar(var, setVar); - Assert.assertEquals(1234L, var.getMaxExecMemByte()); + Assert.assertEquals(2147483648L, var.getMaxExecMemByte()); var = VariableMgr.newSessionVariable(); Assert.assertEquals(1234L, var.getMaxExecMemByte()); SetVar setVar2 = new SetVar(SetType.GLOBAL, "parallel_fragment_exec_instance_num", new IntLiteral(5L)); VariableMgr.setVar(var, setVar2); - Assert.assertEquals(5L, var.getParallelExecInstanceNum()); + Assert.assertEquals(1L, var.getParallelExecInstanceNum()); var = VariableMgr.newSessionVariable(); Assert.assertEquals(5L, var.getParallelExecInstanceNum()); SetVar setVar3 = new SetVar(SetType.GLOBAL, "time_zone", new StringLiteral("Asia/Shanghai")); VariableMgr.setVar(var, setVar3); - Assert.assertEquals("Asia/Shanghai", var.getTimeZone()); + Assert.assertEquals("CST", var.getTimeZone()); var = VariableMgr.newSessionVariable(); Assert.assertEquals("Asia/Shanghai", var.getTimeZone()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f42339cb13..ed7db3726c 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -414,6 +414,8 @@ struct TMasterOpRequest { 9: optional string time_zone 10: optional i64 stmt_id 11: optional i64 sqlMode + 12: optional i64 loadMemLimit + 13: optional bool enableStrictMode } struct TColumnDefinition { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4aa70aae06..4f7ac35807 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -126,6 +126,9 @@ struct TQueryOptions { // multithreaded degree of intra-node parallelism 27: optional i32 mt_dop = 0; + // if this is a query option for LOAD, load_mem_limit should be set to limit the mem comsuption + // of load channel. + 28: optional i64 load_mem_limit = 0; } // A scan range plus the parameters needed to execute that scan.