Fix some bugs (#979)
1. Add Config.max_routine_load_concurrent_task_num instead of the old one 2. Fix a bug that SHOW ALTER TABLE COLUMN may throw Nullpointer exception 3. Fix some misspelling of docs
This commit is contained in:
5
build.sh
5
build.sh
@ -141,10 +141,7 @@ fi
|
||||
# Build docs, should be built before Frontend
|
||||
echo "Build docs"
|
||||
cd ${DORIS_HOME}/docs
|
||||
if [ ${CLEAN} -eq 1 ]; then
|
||||
make clean
|
||||
fi
|
||||
make
|
||||
make clean && make
|
||||
cd ${DORIS_HOME}
|
||||
|
||||
# Clean and build Frontend
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
# 例行导入使用手册
|
||||
|
||||
例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。本文档主要介绍该功能的实现原理、使用方式以及最佳实践。
|
||||
例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。
|
||||
|
||||
本文档主要介绍该功能的实现原理、使用方式以及最佳实践。
|
||||
|
||||
## 名词解释
|
||||
|
||||
@ -39,7 +41,15 @@
|
||||
|
||||
```
|
||||
|
||||
如上图,Client 向 FE 提交一个例行导入作业。FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。
|
||||
如上图,Client 向 FE 提交一个例行导入作业。
|
||||
|
||||
FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
|
||||
|
||||
在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
|
||||
|
||||
FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
|
||||
|
||||
整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。
|
||||
|
||||
## Kafka 例行导入
|
||||
|
||||
@ -53,28 +63,29 @@
|
||||
|
||||
### 创建例行导入任务
|
||||
|
||||
创建例行导入任务的的详细语法可以参照 [这里]()。或者连接到 Doris 后,执行 `HELP ROUTINE LOAD;` 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。
|
||||
创建例行导入任务的的详细语法可以参照 [这里]()。或者连接到 Doris 后,执行 `HELP CREATE ROUTINE LOAD;` 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。
|
||||
|
||||
* columns_mapping
|
||||
|
||||
columns_mapping 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户**显示的指定列映射关系**。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
|
||||
`columns_mapping` 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户**显式的指定列映射关系**。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
|
||||
|
||||
在 columns_mapping 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数对应的实际列类型。举例说明:
|
||||
在 `columns_mapping` 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数参数对应的实际列类型。举例说明:
|
||||
|
||||
假设用户需要导入只包含 `k1` 一列的表,列类型为 int。并且需要将源文件中的 null 值转换为 0。该功能可以通过 `ifnull` 函数实现。正确是的使用方式如下:
|
||||
假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的 null 值转换为 0。该功能可以通过 `ifnull` 函数实现。正确是的使用方式如下:
|
||||
|
||||
`COLUMNS (xx, k1=ifnull(xx, "3"))`
|
||||
|
||||
注意这里我们使用 `"3"` 而不是 `3`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。
|
||||
注意这里我们使用 `"3"` 而不是 `3`,虽然 `k1` 的类型为 `int`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。
|
||||
|
||||
* desired\_concurrent\_number
|
||||
|
||||
desired_concurrent_number 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:
|
||||
`desired_concurrent_number` 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:
|
||||
|
||||
`Min(partition num / 3, desired_concurrent_number, alive_backend_num, DEFAULT_TASK_MAX_CONCURRENT_NUM)`
|
||||
其中DEFAULT_TASK_MAX_CONCURRENT_NUM是系统的一个默认的最大并发数限制。
|
||||
`Min(partition num / 3, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)`
|
||||
|
||||
其中 `Config.max_routine_load_task_concurrrent_num` 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。
|
||||
|
||||
其中 partition 值订阅的 Kafka topic 的 partition数量。`alive_backend_num` 是当前正常的 BE 节点数。
|
||||
其中 partition num 指订阅的 Kafka topic 的 partition 数量。`alive_backend_num` 是当前正常的 BE 节点数。
|
||||
|
||||
* max\_batch\_interval/max\_batch\_rows/max\_batch\_size
|
||||
|
||||
@ -94,9 +105,9 @@
|
||||
|
||||
`max_error_number` 用于控制错误率。在错误率过高的时候,作业会自动暂停。因为整个作业是面向数据流的,因为数据流的无边界性,我们无法像其他导入任务一样,通过一个错误比例来计算错误率。因此这里提供了一种新的计算方式,来计算数据流中的错误比例。
|
||||
|
||||
我们设定了一个采样窗口。窗口的大小为 `max_batch_rows * 10`。在一个采样窗口内,如果错误行数超过 `max_error_number`,则作业被暂定。如果没有超过,则下一个窗口重新开始计算错误行数。
|
||||
我们设定了一个采样窗口。窗口的大小为 `max_batch_rows * 10`。在一个采样窗口内,如果错误行数超过 `max_error_number`,则作业被暂停。如果没有超过,则下一个窗口重新开始计算错误行数。
|
||||
|
||||
我们假设 `max_error_number` 为 200000,则窗口大小为 2000000。设 `max_error_number` 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。
|
||||
我们假设 `max_batch_rows` 为 200000,则窗口大小为 2000000。设 `max_error_number` 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。
|
||||
|
||||
错误行不包括通过 where 条件过滤掉的行。但是包括没有对应的 Doris 表中的分区的行。
|
||||
|
||||
@ -104,31 +115,31 @@
|
||||
|
||||
`data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。
|
||||
|
||||
注意,当显示的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态消费 partition。
|
||||
注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。
|
||||
|
||||
### 查看导入作业状态
|
||||
|
||||
查看作业状态的具体命令和示例可以通过 `help show routine load;` 命令查看。
|
||||
查看**作业**状态的具体命令和示例可以通过 `HELP SHOW ROUTINE LOAD;` 命令查看。
|
||||
|
||||
查看任务运行状态的具体命令和示例可以通过 `help show routine load task;` 命令查看。
|
||||
查看**任务**运行状态的具体命令和示例可以通过 `HELP SHOW ROUTINE LOAD TASK;` 命令查看。
|
||||
|
||||
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
|
||||
|
||||
### 作业控制
|
||||
|
||||
用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `help stop routine load;`, `help pause routine load;` 以及 `help resume routine load;` 三个命令查看帮助和示例。
|
||||
用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` 以及 `HELP RESUME ROUTINE LOAD;` 三个命令查看帮助和示例。
|
||||
|
||||
## 其他说明
|
||||
|
||||
1. 例行导入作业和 ALTER TABLE 操作的关系
|
||||
|
||||
* 例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂定。建议通过在例行导入作业中显式指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。
|
||||
* 删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂定
|
||||
* 例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。
|
||||
* 删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂停。
|
||||
|
||||
2. 例行导入作业和其他导入作业的关系(LOAD, DELETE, INSERT)
|
||||
|
||||
* 例行导入和其他 LOAD 作业以及 INSERT 操作没有冲突。
|
||||
* 当前 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。
|
||||
* 当执行 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。
|
||||
|
||||
3. 例行导入作业和 DROP DATABASE/TABLE 操作的关系
|
||||
|
||||
|
||||
@ -190,7 +190,7 @@
|
||||
);
|
||||
|
||||
## keyword
|
||||
ROUTINE,LOAD
|
||||
CREATE,ROUTINE,LOAD
|
||||
|
||||
# PAUSE ROUTINE LOAD
|
||||
|
||||
|
||||
@ -1106,7 +1106,6 @@ public class SchemaChangeJob extends AlterJob {
|
||||
jobInfo.add(state.name()); // job state
|
||||
jobInfo.add("N/A"); // progress
|
||||
jobInfo.add(cancelMsg);
|
||||
|
||||
jobInfos.add(jobInfo);
|
||||
return;
|
||||
}
|
||||
@ -1122,33 +1121,38 @@ public class SchemaChangeJob extends AlterJob {
|
||||
|
||||
// calc progress and state for each table
|
||||
for (Long indexId : changedIndexIdToSchemaVersion.keySet()) {
|
||||
int totalReplicaNum = 0;
|
||||
int finishedReplicaNum = 0;
|
||||
String idxState = IndexState.NORMAL.name();
|
||||
for (Partition partition : tbl.getPartitions()) {
|
||||
MaterializedIndex index = partition.getIndex(indexId);
|
||||
|
||||
if (state == JobState.RUNNING) {
|
||||
int tableReplicaNum = getTotalReplicaNumByIndexId(indexId);
|
||||
int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId);
|
||||
Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1));
|
||||
Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum,
|
||||
tableFinishedReplicaNum + "/" + tableReplicaNum);
|
||||
totalReplicaNum += tableReplicaNum;
|
||||
finishedReplicaNum += tableFinishedReplicaNum;
|
||||
}
|
||||
|
||||
if (index.getState() != IndexState.NORMAL) {
|
||||
idxState = index.getState().name();
|
||||
}
|
||||
}
|
||||
|
||||
indexState.put(indexId, idxState);
|
||||
|
||||
if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) {
|
||||
indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%");
|
||||
if (tbl.getIndexNameById(indexId) == null) {
|
||||
// this index may be dropped, and this should be a FINISHED job, just use a dummy info to show
|
||||
indexState.put(indexId, IndexState.NORMAL.name());
|
||||
indexProgress.put(indexId, "100%");
|
||||
} else {
|
||||
indexProgress.put(indexId, "0%");
|
||||
int totalReplicaNum = 0;
|
||||
int finishedReplicaNum = 0;
|
||||
String idxState = IndexState.NORMAL.name();
|
||||
for (Partition partition : tbl.getPartitions()) {
|
||||
MaterializedIndex index = partition.getIndex(indexId);
|
||||
if (state == JobState.RUNNING) {
|
||||
int tableReplicaNum = getTotalReplicaNumByIndexId(indexId);
|
||||
int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId);
|
||||
Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1));
|
||||
Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum,
|
||||
tableFinishedReplicaNum + "/" + tableReplicaNum);
|
||||
totalReplicaNum += tableReplicaNum;
|
||||
finishedReplicaNum += tableFinishedReplicaNum;
|
||||
}
|
||||
|
||||
if (index.getState() != IndexState.NORMAL) {
|
||||
idxState = index.getState().name();
|
||||
}
|
||||
}
|
||||
|
||||
indexState.put(indexId, idxState);
|
||||
|
||||
if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) {
|
||||
indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%");
|
||||
} else {
|
||||
indexProgress.put(indexId, "0%");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1159,7 +1163,7 @@ public class SchemaChangeJob extends AlterJob {
|
||||
jobInfo.add(tbl.getName());
|
||||
jobInfo.add(TimeUtils.longToTimeString(createTime));
|
||||
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
|
||||
jobInfo.add(tbl.getIndexNameById(indexId)); // index name
|
||||
jobInfo.add(tbl.getIndexNameById(indexId) == null ? "N/A" : tbl.getIndexNameById(indexId)); // index name
|
||||
jobInfo.add(indexId);
|
||||
// index schema version and schema hash
|
||||
jobInfo.add(changedIndexIdToSchemaVersion.get(indexId) + "-" + changedIndexIdToSchemaHash.get(indexId));
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -279,7 +280,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY),
|
||||
RoutineLoadJob.DEFAULT_TASK_MAX_CONCURRENT_NUM, DESIRED_CONCURRENT_NUMBER_PRED,
|
||||
Config.max_routine_load_task_concurrent_num, DESIRED_CONCURRENT_NUMBER_PRED,
|
||||
DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue();
|
||||
|
||||
maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY),
|
||||
|
||||
@ -239,9 +239,9 @@ public class InsertStmt extends DdlStmt {
|
||||
String jobLabel = "insert_" + uuid;
|
||||
LoadJobSourceType sourceType = isStreaming ? LoadJobSourceType.INSERT_STREAMING
|
||||
: LoadJobSourceType.FRONTEND;
|
||||
long timeoutMs = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000;
|
||||
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
|
||||
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
|
||||
jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutMs);
|
||||
jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
|
||||
if (isStreaming) {
|
||||
OlapTableSink sink = (OlapTableSink) dataSink;
|
||||
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
|
||||
@ -18,8 +18,8 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
|
||||
@ -119,4 +119,13 @@ public class ShowTabletStmt extends ShowStmt {
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
|
||||
return RedirectStatus.FORWARD_NO_SYNC;
|
||||
} else {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,11 +76,11 @@ public class CatalogRecycleBin extends Daemon implements Writable {
|
||||
// erase db with same name
|
||||
eraseDatabaseWithSameName(db.getFullName());
|
||||
|
||||
// recylce db
|
||||
// recycle db
|
||||
RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames);
|
||||
idToDatabase.put(db.getId(), databaseInfo);
|
||||
idToRecycleTime.put(db.getId(), System.currentTimeMillis());
|
||||
LOG.info("recycle db[{}]", db.getId());
|
||||
LOG.info("recycle db[{}-{}]", db.getId(), db.getFullName());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -93,11 +93,11 @@ public class CatalogRecycleBin extends Daemon implements Writable {
|
||||
// erase table with same name
|
||||
eraseTableWithSameName(dbId, table.getName());
|
||||
|
||||
// recylce table
|
||||
// recycle table
|
||||
RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
|
||||
idToRecycleTime.put(table.getId(), System.currentTimeMillis());
|
||||
idToTable.put(table.getId(), tableInfo);
|
||||
LOG.info("recycle table[{}]", table.getId());
|
||||
LOG.info("recycle table[{}-{}]", table.getId(), table.getName());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -112,12 +112,12 @@ public class CatalogRecycleBin extends Daemon implements Writable {
|
||||
// erase partition with same name
|
||||
erasePartitionWithSameName(dbId, tableId, partition.getName());
|
||||
|
||||
// recylce partition
|
||||
// recycle partition
|
||||
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition,
|
||||
range, dataProperty, replicationNum);
|
||||
idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
|
||||
idToPartition.put(partition.getId(), partitionInfo);
|
||||
LOG.info("recycle partition[{}]", partition.getId());
|
||||
LOG.info("recycle partition[{}-{}]", partition.getId(), partition.getName());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -775,5 +775,11 @@ public class Config extends ConfigBase {
|
||||
* If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval
|
||||
*/
|
||||
@ConfField public static boolean enable_metric_calculator = false;
|
||||
|
||||
/*
|
||||
* the max concurrent task num of a routine load task
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int max_routine_load_task_concurrent_num = 5;
|
||||
}
|
||||
|
||||
|
||||
@ -3138,7 +3138,7 @@ public class Load {
|
||||
loadDeleteJob.setState(JobState.LOADING);
|
||||
long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel,
|
||||
"FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
|
||||
Config.stream_load_default_timeout_second * 1000);
|
||||
Config.stream_load_default_timeout_second);
|
||||
loadDeleteJob.setTransactionId(transactionId);
|
||||
// the delete job will be persist in editLog
|
||||
addLoadJob(loadDeleteJob, db);
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.LoadException;
|
||||
@ -141,11 +142,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
}
|
||||
|
||||
LOG.info("current concurrent task number is min"
|
||||
+ "(max partition division {}, desire task concurrent num {}, alive be num {})",
|
||||
maxPartitionDivision, desireTaskConcurrentNum, aliveBeNum);
|
||||
+ "(max partition division {}, desire task concurrent num {}, alive be num {}, config: {})",
|
||||
maxPartitionDivision, desireTaskConcurrentNum, aliveBeNum, Config.max_routine_load_task_concurrent_num);
|
||||
currentTaskConcurrentNum =
|
||||
Math.min(Math.min(maxPartitionDivision, Math.min(desireTaskConcurrentNum, aliveBeNum)),
|
||||
DEFAULT_TASK_MAX_CONCURRENT_NUM);
|
||||
Config.max_routine_load_task_concurrent_num);
|
||||
return currentTaskConcurrentNum;
|
||||
}
|
||||
|
||||
|
||||
@ -86,7 +86,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable {
|
||||
private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class);
|
||||
|
||||
public static final int DEFAULT_TASK_MAX_CONCURRENT_NUM = 3;
|
||||
public static final long DEFAULT_MAX_ERROR_NUM = 0;
|
||||
|
||||
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
|
||||
|
||||
@ -22,12 +22,13 @@ import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.collect.BoundType;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -68,7 +69,7 @@ public class PartitionColumnFilter {
|
||||
}
|
||||
}
|
||||
|
||||
// selete the smaller bound
|
||||
// select the smaller bound
|
||||
public void setUpperBound(LiteralExpr newUpperBound, boolean newUpperBoundInclusive) {
|
||||
if (null == upperBound) {
|
||||
upperBound = newUpperBound;
|
||||
|
||||
@ -77,7 +77,7 @@ public abstract class LoadPendingTask extends MasterTask {
|
||||
if (job.getTransactionId() < 0) {
|
||||
long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(dbId,
|
||||
job.getLabel(), "FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
|
||||
job.getTimeoutSecond() * 1000);
|
||||
job.getTimeoutSecond());
|
||||
job.setTransactionId(transactionId);
|
||||
}
|
||||
createEtlRequest();
|
||||
|
||||
@ -108,7 +108,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
TransactionState transactionState = masterTransMgr.getTransactionState(transactionId);
|
||||
assertNotNull(transactionState);
|
||||
assertEquals(transactionId, transactionState.getTransactionId());
|
||||
@ -126,7 +126,7 @@ public class GlobalTransactionMgrTest {
|
||||
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
} catch (AnalysisException e) {
|
||||
e.printStackTrace();
|
||||
} catch (LabelAlreadyUsedException e) {
|
||||
@ -143,7 +143,7 @@ public class GlobalTransactionMgrTest {
|
||||
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
} catch (Exception e) {
|
||||
// TODO: handle exception
|
||||
}
|
||||
@ -156,7 +156,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
// commit a transaction
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
@ -197,7 +197,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
// commit a transaction with 1,2 success
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
@ -219,7 +219,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable2,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
@ -315,7 +315,7 @@ public class GlobalTransactionMgrTest {
|
||||
routineLoadTaskInfoList.add(routineLoadTaskInfo);
|
||||
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L,
|
||||
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
|
||||
Config.stream_load_default_timeout_second * 1000);
|
||||
Config.stream_load_default_timeout_second);
|
||||
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
|
||||
masterTransMgr.getListenerRegistry().register(routineLoadJob);
|
||||
// Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob);
|
||||
@ -381,7 +381,7 @@ public class GlobalTransactionMgrTest {
|
||||
routineLoadTaskInfoList.add(routineLoadTaskInfo);
|
||||
TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L,
|
||||
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
|
||||
Config.stream_load_default_timeout_second * 1000);
|
||||
Config.stream_load_default_timeout_second);
|
||||
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
|
||||
masterTransMgr.getListenerRegistry().register(routineLoadJob);
|
||||
Map<Long, TransactionState> idToTransactionState = Maps.newHashMap();
|
||||
@ -426,7 +426,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
// commit a transaction
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
@ -472,7 +472,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
// commit a transaction with 1,2 success
|
||||
TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId1);
|
||||
@ -526,7 +526,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable2,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
tabletCommitInfo1 = new TabletCommitInfo(CatalogTestUtil.testTabletId1, CatalogTestUtil.testBackendId1);
|
||||
TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(CatalogTestUtil.testTabletId1,
|
||||
CatalogTestUtil.testBackendId3);
|
||||
@ -594,7 +594,7 @@ public class GlobalTransactionMgrTest {
|
||||
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
|
||||
CatalogTestUtil.testTxnLable1,
|
||||
transactionSource,
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second * 1000);
|
||||
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
|
||||
TransactionState transactionState = masterTransMgr.getTransactionState(transactionId);
|
||||
assertNotNull(transactionState);
|
||||
assertEquals(transactionId, transactionState.getTransactionId());
|
||||
|
||||
Reference in New Issue
Block a user