diff --git a/build.sh b/build.sh index 76685f7e07..0217675cb3 100755 --- a/build.sh +++ b/build.sh @@ -141,10 +141,7 @@ fi # Build docs, should be built before Frontend echo "Build docs" cd ${DORIS_HOME}/docs -if [ ${CLEAN} -eq 1 ]; then - make clean -fi -make +make clean && make cd ${DORIS_HOME} # Clean and build Frontend diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index 5b5cf2ded1..1cf0659073 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -1,6 +1,8 @@ # 例行导入使用手册 -例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。本文档主要介绍该功能的实现原理、使用方式以及最佳实践。 +例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。 + +本文档主要介绍该功能的实现原理、使用方式以及最佳实践。 ## 名词解释 @@ -39,7 +41,15 @@ ``` -如上图,Client 向 FE 提交一个例行导入作业。FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。 +如上图,Client 向 FE 提交一个例行导入作业。 + +FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。 + +在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。 + +FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。 + +整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。 ## Kafka 例行导入 @@ -53,28 +63,29 @@ ### 创建例行导入任务 -创建例行导入任务的的详细语法可以参照 [这里]()。或者连接到 Doris 后,执行 `HELP ROUTINE LOAD;` 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。 +创建例行导入任务的的详细语法可以参照 [这里]()。或者连接到 Doris 后,执行 `HELP CREATE ROUTINE LOAD;` 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。 * columns_mapping - columns_mapping 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户**显示的指定列映射关系**。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。 + `columns_mapping` 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户**显式的指定列映射关系**。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。 - 在 columns_mapping 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数对应的实际列类型。举例说明: + 在 `columns_mapping` 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数参数对应的实际列类型。举例说明: - 假设用户需要导入只包含 `k1` 一列的表,列类型为 int。并且需要将源文件中的 null 值转换为 0。该功能可以通过 `ifnull` 函数实现。正确是的使用方式如下: + 假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的 null 值转换为 0。该功能可以通过 `ifnull` 函数实现。正确是的使用方式如下: `COLUMNS (xx, k1=ifnull(xx, "3"))` - 注意这里我们使用 `"3"` 而不是 `3`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。 + 注意这里我们使用 `"3"` 而不是 `3`,虽然 `k1` 的类型为 `int`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。 * desired\_concurrent\_number - desired_concurrent_number 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下: + `desired_concurrent_number` 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下: - `Min(partition num / 3, desired_concurrent_number, alive_backend_num, DEFAULT_TASK_MAX_CONCURRENT_NUM)` - 其中DEFAULT_TASK_MAX_CONCURRENT_NUM是系统的一个默认的最大并发数限制。 + `Min(partition num / 3, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)` + + 其中 `Config.max_routine_load_task_concurrrent_num` 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。 - 其中 partition 值订阅的 Kafka topic 的 partition数量。`alive_backend_num` 是当前正常的 BE 节点数。 + 其中 partition num 指订阅的 Kafka topic 的 partition 数量。`alive_backend_num` 是当前正常的 BE 节点数。 * max\_batch\_interval/max\_batch\_rows/max\_batch\_size @@ -94,9 +105,9 @@ `max_error_number` 用于控制错误率。在错误率过高的时候,作业会自动暂停。因为整个作业是面向数据流的,因为数据流的无边界性,我们无法像其他导入任务一样,通过一个错误比例来计算错误率。因此这里提供了一种新的计算方式,来计算数据流中的错误比例。 - 我们设定了一个采样窗口。窗口的大小为 `max_batch_rows * 10`。在一个采样窗口内,如果错误行数超过 `max_error_number`,则作业被暂定。如果没有超过,则下一个窗口重新开始计算错误行数。 + 我们设定了一个采样窗口。窗口的大小为 `max_batch_rows * 10`。在一个采样窗口内,如果错误行数超过 `max_error_number`,则作业被暂停。如果没有超过,则下一个窗口重新开始计算错误行数。 - 我们假设 `max_error_number` 为 200000,则窗口大小为 2000000。设 `max_error_number` 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。 + 我们假设 `max_batch_rows` 为 200000,则窗口大小为 2000000。设 `max_error_number` 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。 错误行不包括通过 where 条件过滤掉的行。但是包括没有对应的 Doris 表中的分区的行。 @@ -104,31 +115,31 @@ `data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。 - 注意,当显示的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态消费 partition。 + 注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。 ### 查看导入作业状态 -查看作业状态的具体命令和示例可以通过 `help show routine load;` 命令查看。 +查看**作业**状态的具体命令和示例可以通过 `HELP SHOW ROUTINE LOAD;` 命令查看。 -查看任务运行状态的具体命令和示例可以通过 `help show routine load task;` 命令查看。 +查看**任务**运行状态的具体命令和示例可以通过 `HELP SHOW ROUTINE LOAD TASK;` 命令查看。 只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。 ### 作业控制 -用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `help stop routine load;`, `help pause routine load;` 以及 `help resume routine load;` 三个命令查看帮助和示例。 +用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` 以及 `HELP RESUME ROUTINE LOAD;` 三个命令查看帮助和示例。 ## 其他说明 1. 例行导入作业和 ALTER TABLE 操作的关系 - * 例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂定。建议通过在例行导入作业中显式指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。 - * 删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂定 + * 例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。 + * 删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂停。 2. 例行导入作业和其他导入作业的关系(LOAD, DELETE, INSERT) * 例行导入和其他 LOAD 作业以及 INSERT 操作没有冲突。 - * 当前 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。 + * 当执行 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。 3. 例行导入作业和 DROP DATABASE/TABLE 操作的关系 diff --git a/docs/help/Contents/Data Manipulation/routine_load.md b/docs/help/Contents/Data Manipulation/routine_load.md index 2345ebc00b..aa27e65324 100644 --- a/docs/help/Contents/Data Manipulation/routine_load.md +++ b/docs/help/Contents/Data Manipulation/routine_load.md @@ -190,7 +190,7 @@ ); ## keyword - ROUTINE,LOAD + CREATE,ROUTINE,LOAD # PAUSE ROUTINE LOAD diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index 95a5537afd..2e7ee376c9 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -1106,7 +1106,6 @@ public class SchemaChangeJob extends AlterJob { jobInfo.add(state.name()); // job state jobInfo.add("N/A"); // progress jobInfo.add(cancelMsg); - jobInfos.add(jobInfo); return; } @@ -1122,33 +1121,38 @@ public class SchemaChangeJob extends AlterJob { // calc progress and state for each table for (Long indexId : changedIndexIdToSchemaVersion.keySet()) { - int totalReplicaNum = 0; - int finishedReplicaNum = 0; - String idxState = IndexState.NORMAL.name(); - for (Partition partition : tbl.getPartitions()) { - MaterializedIndex index = partition.getIndex(indexId); - - if (state == JobState.RUNNING) { - int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); - int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); - Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); - Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, - tableFinishedReplicaNum + "/" + tableReplicaNum); - totalReplicaNum += tableReplicaNum; - finishedReplicaNum += tableFinishedReplicaNum; - } - - if (index.getState() != IndexState.NORMAL) { - idxState = index.getState().name(); - } - } - - indexState.put(indexId, idxState); - - if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) { - indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%"); + if (tbl.getIndexNameById(indexId) == null) { + // this index may be dropped, and this should be a FINISHED job, just use a dummy info to show + indexState.put(indexId, IndexState.NORMAL.name()); + indexProgress.put(indexId, "100%"); } else { - indexProgress.put(indexId, "0%"); + int totalReplicaNum = 0; + int finishedReplicaNum = 0; + String idxState = IndexState.NORMAL.name(); + for (Partition partition : tbl.getPartitions()) { + MaterializedIndex index = partition.getIndex(indexId); + if (state == JobState.RUNNING) { + int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); + int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); + Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); + Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, + tableFinishedReplicaNum + "/" + tableReplicaNum); + totalReplicaNum += tableReplicaNum; + finishedReplicaNum += tableFinishedReplicaNum; + } + + if (index.getState() != IndexState.NORMAL) { + idxState = index.getState().name(); + } + } + + indexState.put(indexId, idxState); + + if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) { + indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%"); + } else { + indexProgress.put(indexId, "0%"); + } } } @@ -1159,7 +1163,7 @@ public class SchemaChangeJob extends AlterJob { jobInfo.add(tbl.getName()); jobInfo.add(TimeUtils.longToTimeString(createTime)); jobInfo.add(TimeUtils.longToTimeString(finishedTime)); - jobInfo.add(tbl.getIndexNameById(indexId)); // index name + jobInfo.add(tbl.getIndexNameById(indexId) == null ? "N/A" : tbl.getIndexNameById(indexId)); // index name jobInfo.add(indexId); // index schema version and schema hash jobInfo.add(changedIndexIdToSchemaVersion.get(indexId) + "-" + changedIndexIdToSchemaHash.get(indexId)); diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 2af3b150ac..7d30d9975e 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -279,7 +280,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { } desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY), - RoutineLoadJob.DEFAULT_TASK_MAX_CONCURRENT_NUM, DESIRED_CONCURRENT_NUMBER_PRED, + Config.max_routine_load_task_concurrent_num, DESIRED_CONCURRENT_NUMBER_PRED, DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue(); maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY), diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index b89e948ac9..5c27198b46 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -239,9 +239,9 @@ public class InsertStmt extends DdlStmt { String jobLabel = "insert_" + uuid; LoadJobSourceType sourceType = isStreaming ? LoadJobSourceType.INSERT_STREAMING : LoadJobSourceType.FRONTEND; - long timeoutMs = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000; + long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), - jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutMs); + jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond); if (isStreaming) { OlapTableSink sink = (OlapTableSink) dataSink; TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowAlterStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowAlterStmt.java index d6868b3b9a..1bba28f30e 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowAlterStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowAlterStmt.java @@ -18,8 +18,8 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java index 3083580cf9..9de0213634 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java @@ -119,4 +119,13 @@ public class ShowTabletStmt extends ShowStmt { } return builder.build(); } + + @Override + public RedirectStatus getRedirectStatus() { + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 7cc7d4820e..de489344f7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -76,11 +76,11 @@ public class CatalogRecycleBin extends Daemon implements Writable { // erase db with same name eraseDatabaseWithSameName(db.getFullName()); - // recylce db + // recycle db RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames); idToDatabase.put(db.getId(), databaseInfo); idToRecycleTime.put(db.getId(), System.currentTimeMillis()); - LOG.info("recycle db[{}]", db.getId()); + LOG.info("recycle db[{}-{}]", db.getId(), db.getFullName()); return true; } @@ -93,11 +93,11 @@ public class CatalogRecycleBin extends Daemon implements Writable { // erase table with same name eraseTableWithSameName(dbId, table.getName()); - // recylce table + // recycle table RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table); idToRecycleTime.put(table.getId(), System.currentTimeMillis()); idToTable.put(table.getId(), tableInfo); - LOG.info("recycle table[{}]", table.getId()); + LOG.info("recycle table[{}-{}]", table.getId(), table.getName()); return true; } @@ -112,12 +112,12 @@ public class CatalogRecycleBin extends Daemon implements Writable { // erase partition with same name erasePartitionWithSameName(dbId, tableId, partition.getName()); - // recylce partition + // recycle partition RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition, range, dataProperty, replicationNum); idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); idToPartition.put(partition.getId(), partitionInfo); - LOG.info("recycle partition[{}]", partition.getId()); + LOG.info("recycle partition[{}-{}]", partition.getId(), partition.getName()); return true; } diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 2d608305b8..2071232828 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -775,5 +775,11 @@ public class Config extends ConfigBase { * If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval */ @ConfField public static boolean enable_metric_calculator = false; + + /* + * the max concurrent task num of a routine load task + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_routine_load_task_concurrent_num = 5; } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 9154639193..91aac372d5 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -3138,7 +3138,7 @@ public class Load { loadDeleteJob.setState(JobState.LOADING); long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND, - Config.stream_load_default_timeout_second * 1000); + Config.stream_load_default_timeout_second); loadDeleteJob.setTransactionId(transactionId); // the delete job will be persist in editLog addLoadJob(loadDeleteJob, db); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index c1f012b00d..7d26179c8a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -20,6 +20,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LoadException; @@ -141,11 +142,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } LOG.info("current concurrent task number is min" - + "(max partition division {}, desire task concurrent num {}, alive be num {})", - maxPartitionDivision, desireTaskConcurrentNum, aliveBeNum); + + "(max partition division {}, desire task concurrent num {}, alive be num {}, config: {})", + maxPartitionDivision, desireTaskConcurrentNum, aliveBeNum, Config.max_routine_load_task_concurrent_num); currentTaskConcurrentNum = Math.min(Math.min(maxPartitionDivision, Math.min(desireTaskConcurrentNum, aliveBeNum)), - DEFAULT_TASK_MAX_CONCURRENT_NUM); + Config.max_routine_load_task_concurrent_num); return currentTaskConcurrentNum; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0ae48acada..ba68f6c1e0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -86,7 +86,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable { private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); - public static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3; public static final long DEFAULT_MAX_ERROR_NUM = 0; public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; diff --git a/fe/src/main/java/org/apache/doris/planner/PartitionColumnFilter.java b/fe/src/main/java/org/apache/doris/planner/PartitionColumnFilter.java index 191354a5c6..6350267fc7 100644 --- a/fe/src/main/java/org/apache/doris/planner/PartitionColumnFilter.java +++ b/fe/src/main/java/org/apache/doris/planner/PartitionColumnFilter.java @@ -22,12 +22,13 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; + import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Range; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; @@ -68,7 +69,7 @@ public class PartitionColumnFilter { } } - // selete the smaller bound + // select the smaller bound public void setUpperBound(LiteralExpr newUpperBound, boolean newUpperBoundInclusive) { if (null == upperBound) { upperBound = newUpperBound; diff --git a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java index e5957c26a5..c46eca8e4a 100644 --- a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java @@ -77,7 +77,7 @@ public abstract class LoadPendingTask extends MasterTask { if (job.getTransactionId() < 0) { long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(dbId, job.getLabel(), "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND, - job.getTimeoutSecond() * 1000); + job.getTimeoutSecond()); job.setTransactionId(transactionId); } createEtlRequest(); diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index f601d27749..379dc4e981 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -108,7 +108,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); TransactionState transactionState = masterTransMgr.getTransactionState(transactionId); assertNotNull(transactionState); assertEquals(transactionId, transactionState.getTransactionId()); @@ -126,7 +126,7 @@ public class GlobalTransactionMgrTest { transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); } catch (AnalysisException e) { e.printStackTrace(); } catch (LabelAlreadyUsedException e) { @@ -143,7 +143,7 @@ public class GlobalTransactionMgrTest { transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); } catch (Exception e) { // TODO: handle exception } @@ -156,7 +156,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); // commit a transaction TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); @@ -197,7 +197,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); // commit a transaction with 1,2 success TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); @@ -219,7 +219,7 @@ public class GlobalTransactionMgrTest { long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable2, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3); @@ -315,7 +315,7 @@ public class GlobalTransactionMgrTest { routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), - Config.stream_load_default_timeout_second * 1000); + Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getListenerRegistry().register(routineLoadJob); // Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); @@ -381,7 +381,7 @@ public class GlobalTransactionMgrTest { routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(), - Config.stream_load_default_timeout_second * 1000); + Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getListenerRegistry().register(routineLoadJob); Map idToTransactionState = Maps.newHashMap(); @@ -426,7 +426,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); // commit a transaction TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); @@ -472,7 +472,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); // commit a transaction with 1,2 success TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); @@ -526,7 +526,7 @@ public class GlobalTransactionMgrTest { long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable2, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1); TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId3); @@ -594,7 +594,7 @@ public class GlobalTransactionMgrTest { long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, CatalogTestUtil.testTxnLable1, transactionSource, - LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000); + LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); TransactionState transactionState = masterTransMgr.getTransactionState(transactionId); assertNotNull(transactionState); assertEquals(transactionId, transactionState.getTransactionId());