Add administrator guide of load (#1488)
The catalogue of load docs: ---- load-manual.md ---- broker-load-manual.md ---- insert-into-manual.md ---- stream-load-manual.md This commit also changes max/min_stream_load_timeout to max/min_load_timeout. The old config named stream_load_timeout means the max timeout suited for all types of load. So the config name has been changed.
This commit is contained in:
@ -1,43 +1,386 @@
|
||||
# Broker Load
|
||||
Broker load 是一个异步的导入方式,主要支持的数据源有 HDFS, BOS, AFS。用户需要通过 Mysql client 创建 Broker load 导入,并通过查看导入命令检查导入结果。
|
||||
|
||||
## properties
|
||||
Broker load 主要适用于源数据在文件系统中的,或者是超大文件导入的需求。
|
||||
|
||||
### strict mode
|
||||
broker load导入可以开启strict mode模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode为开启。
|
||||
# 名词解释
|
||||
1. HDFS: Hadoop 分布式文件系统。用户的待导入数据可能存放的位置。
|
||||
2. BOS:百度云提供的对象存储系统,百度云上的 Palo 用户的待导入数据可能存放的位置。
|
||||
3. AFS:百度内部提供的超大规模文件系统,百度厂内的 Palo 用户的待导入数据可能存放的位置。
|
||||
4. plan:导入执行计划,BE 会执行导入执行计划将数据导入到 Doris 系统中。
|
||||
5. Broker:Doris 系统中负责读取外部文件系统数据的组件。
|
||||
6. backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。
|
||||
7. frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入 plan 生成和导入任务的调度工作。
|
||||
|
||||
*注意:strict mode 功能仅在新版本的broker load中有效,如果导入明确指定 ```"version" = "v1"``` 则没有此功能。*
|
||||
# 基本原理
|
||||
|
||||
strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
|
||||
用户在提交导入任务后,FE 会生成对应的 plan 并根据目前 BE 的个数和文件的大小,将 plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
|
||||
|
||||
1. 对于列类型转换来说,如果 strict\_mode 为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
|
||||
BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,并且事务被 publish 后,导入完成。
|
||||
|
||||
2. 对于导入的某列包含函数变换的,导入的值和函数的结果一致,strict 对其不产生影响。(其中 strftime 等 broker 系统支持的函数也属于这类)。
|
||||
```
|
||||
+
|
||||
| user create broker load
|
||||
v
|
||||
+----+----+
|
||||
| |
|
||||
| FE |
|
||||
| |
|
||||
+----+----+
|
||||
|
|
||||
| BE etl and load the data
|
||||
+--------------------------+
|
||||
| | |
|
||||
+---v---+ +--v----+ +---v---+
|
||||
| | | | | |
|
||||
| BE | | BE | | BE |
|
||||
| | | | | |
|
||||
+---+---+ +---+---+ +----+--+
|
||||
| | |
|
||||
| | | pull data from broker
|
||||
+---v---+ +---v---+ +----v--+
|
||||
| | | | | |
|
||||
|Broker | |Broker | |Broker |
|
||||
| | | | | |
|
||||
+-------+--+ +---+---+ ++------+
|
||||
| | |
|
||||
| | |
|
||||
+-v------v---------v--+
|
||||
|HDFS/BOS/AFS cluster |
|
||||
| |
|
||||
+---------------------+
|
||||
|
||||
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict 对其也不产生影响。
|
||||
+ 例如:如果类型是 decimal(1,0), 原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
|
||||
```
|
||||
|
||||
### strict mode 与 source data 的导入关系
|
||||
# 基本操作
|
||||
## 创建导入
|
||||
|
||||
这里以列类型为 TinyInt 来举例
|
||||
Broker load 创建导入语句
|
||||
|
||||
注:当表中的列允许导入空值时
|
||||
```
|
||||
语法
|
||||
LOAD LABEL load_label
|
||||
(data_desc, ...)
|
||||
with broker broker_name broker_properties
|
||||
[PROPERTIES (key1=value1, ... )]
|
||||
|
||||
source data | source data example | string to int | strict_mode | load_data
|
||||
LABEL: db_name.label_name
|
||||
|
||||
data_desc:
|
||||
DATA INFILE ('file_path', ...)
|
||||
[NEGATIVE]
|
||||
INTO TABLE tbl_name
|
||||
[PARTITION (p1, p2)]
|
||||
[COLUMNS TERMINATED BY separator ]
|
||||
[(col1, ...)]
|
||||
[SET (k1=f1(xx), k2=f2(xx))]
|
||||
|
||||
broker_name: string
|
||||
|
||||
broker_properties: (key1=value1, ...)
|
||||
|
||||
示例:
|
||||
load label test.int_table_01
|
||||
(DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/id_name") into table id_name_table columns terminated by "," (tmp_c1,tmp_c2) set (id=tmp_c2, name=tmp_c1))
|
||||
with broker 'broker' ("username"="root", "password"="3trqDWfl")
|
||||
properties ("timeout" = "10000" );
|
||||
|
||||
```
|
||||
|
||||
创建导入的详细语法执行 ``` HELP BROKER LOAD;``` 查看语法帮助。这里主要介绍 Broker load 的创建导入语法中参数意义和注意事项。
|
||||
|
||||
+ label
|
||||
|
||||
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
|
||||
|
||||
label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At most once**
|
||||
|
||||
当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。
|
||||
|
||||
### 数据描述类参数
|
||||
|
||||
数据描述类参数主要指的是 Broker load 创建导入语句中的属于 ```data_desc``` 部分的参数。每组 ```data_desc ``` 主要表述了本次导入涉及到的数据源地址,ETL 函数,目标表及分区等信息。
|
||||
|
||||
下面主要对数据描述类的部分参数详细解释:
|
||||
|
||||
+ 多表导入
|
||||
|
||||
Broker load 支持一次导入任务涉及多张表,每个 Broker load 导入任务可在多个 ``` data_desc ``` 声明多张表来实现多表导入。每个单独的 ```data_desc``` 还可以指定属于该表的数据源地址。Broker load 保证了单次导入的多张表之间原子性成功或失败。
|
||||
|
||||
+ negative
|
||||
|
||||
```data_desc```中还可以设置数据取反导入。这个功能主要用于,当用户上次导入已经成功,但是想取消上次导入的数据时,就可以使用相同的源文件并设置导入 nagetive 取反功能。上次导入的数据就可以被撤销了。
|
||||
|
||||
该功能仅对聚合类型为 SUM 的列有效。聚合列数值在取反导入后会根据待导入数据值,不断递减。
|
||||
|
||||
+ Partition
|
||||
|
||||
在 ``` data_desc ``` 中可以指定待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 ```dpp.abnorm.ALL ```
|
||||
|
||||
### 导入任务参数
|
||||
|
||||
导入任务参数主要指的是 Broker load 创建导入语句中的属于 ```opt_properties```部分你的参数。导入任务参数是作用于整个导入任务的。
|
||||
|
||||
下面主要对导入任务参数的部分参数详细解释:
|
||||
|
||||
+ timeout
|
||||
|
||||
导入任务的超时时间(以秒为单位),用户可以在```opt_properties```中自行设置每个导入的超时时间。导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。Broker load 的默认导入超时时间为4小时。
|
||||
|
||||
通常情况下,用户不需要手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。
|
||||
|
||||
**推荐超时时间**
|
||||
|
||||
```
|
||||
总文件大小(MB) / 用户 Doris 集群最慢导入速度(MB/s) > timeout > ((总文件大小(MB) * 待导入的表及相关 Roll up 表的个数) / (10 * 导入并发数) )
|
||||
|
||||
导入并发数见文档最后的导入系统配置说明,公式中的 10 为目前的导入限速 10MB/s。
|
||||
|
||||
```
|
||||
|
||||
例如一个 1G 的待导入数据,待导入表涉及了3个 Roll up 表,当前的导入并发数为 3。则 timeout 的 最小值为 ```(1 * 1024 * 3 ) / (10 * 3) = 102 秒```
|
||||
|
||||
由于每个 Doris 集群的机器环境不同且集群并发的查询任务也不同,所以用户 Doris 集群的最慢导入速度需要用户自己根据历史的导入任务速度进行推测。
|
||||
|
||||
*注意:用户设置导入超时时间最好在上述范围内。如果设置的超时时间过长,可能会导致总导入任务的个数超过 Doris 系统限制,导致后续提交的导入被系统取消*
|
||||
|
||||
+ max\_filter\_ratio
|
||||
|
||||
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的 filter ratio 超过该值,则导入失败。计算公式为: ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )> max_filter_ratio ```
|
||||
|
||||
``` dpp.abnorm.ALL ``` 在代码中也叫 ``` num_rows_filtered``` 指的是导入过程中被过滤的错误数据。比如:列在表中为非空列,但是导入数据为空则为错误数据。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的错误数据量。
|
||||
|
||||
``` dpp.norm.ALL ``` 指的是导入过程中正确数据的条数。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的正确数据量。
|
||||
|
||||
``` 原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL ```
|
||||
|
||||
+ exec\_mem\_limit
|
||||
|
||||
导入任务的内存使用上限。当导入任务使用的内存超过设定上限时,导入任务会被 CANCEL。系统默认的内存上限为2G。
|
||||
|
||||
设定导入任务内存上限,可以保证各个导入任务之间以及导入任务和查询之间相互不影响。导入任务的内存上限不宜过大,过大可能会导致导入占满 be 内存,查询无法快速返回结果。
|
||||
|
||||
```导入任务内存使用量 < 文件大小 / 当前执行导入任务的并发数```
|
||||
|
||||
+ strict mode
|
||||
|
||||
Broker load 导入可以开启 strict mode模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode为开启。
|
||||
|
||||
strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
|
||||
|
||||
1. 对于列类型转换来说,如果 strict\_mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
|
||||
|
||||
2. 对于导入的某列包含函数变换的,导入的值和函数的结果一致,strict 对其不产生影响。(其中 strftime 等 Broker load 支持的函数也属于这类)。
|
||||
|
||||
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict 对其也不产生影响。
|
||||
+ 例如:如果类型是 decimal(1,0), 原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
|
||||
|
||||
#### strict mode 与 source data 的导入关系
|
||||
|
||||
这里以列类型为 TinyInt 来举例
|
||||
|
||||
注:当表中的列允许导入空值时
|
||||
|
||||
source data | source data example | string to int | strict_mode | load_data
|
||||
------------|---------------------|-----------------|--------------------|---------
|
||||
空值 | \N | N/A | true or false | NULL
|
||||
not null | aaa or 2000 | NULL | true | filtered
|
||||
not null | aaa | NULL | false | NULL
|
||||
not null | 1 | 1 | true or false | correct data
|
||||
|
||||
这里以列类型为 Decimal(1,0) 举例
|
||||
这里以列类型为 Decimal(1,0) 举例
|
||||
|
||||
注:当表中的列允许导入空值时
|
||||
注:当表中的列允许导入空值时
|
||||
|
||||
source data | source data example | string to int | strict_mode | load_data
|
||||
source data | source data example | string to int | strict_mode | load_data
|
||||
------------|---------------------|-----------------|--------------------|---------
|
||||
空值 | \N | N/A | true or false | NULL
|
||||
not null | aaa | NULL | true | filtered
|
||||
not null | aaa | NULL | false | NULL
|
||||
not null | 1 or 10 | 1 | true or false | correct data
|
||||
|
||||
*注意:10 虽然是一个超过范围的值,但是因为其类型符合decimal的要求,所以strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被strict mode过滤。*
|
||||
*注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode过滤。*
|
||||
|
||||
## 查看导入
|
||||
|
||||
Broker load 导入方式由于是异步的,所以用户必须将创建导入的 Label 记录,并且在**查看导入命令中使用 Label 来查看导入结果**。查看导入命令在所有导入方式中是通用的,具体语法可执行 ``` HELP SHOW LOAD ```查看。
|
||||
|
||||
下面主要介绍了查看导入命令返回结果集中参数意义:
|
||||
|
||||
+ JobId
|
||||
|
||||
导入任务的唯一标识,每个导入任务的JobId都不同。与 Label 不同的是,JobId永远不会相同,而 Label 则可以在导入任务失败后被复用。
|
||||
|
||||
+ Label
|
||||
|
||||
导入任务的标识。
|
||||
|
||||
+ State
|
||||
|
||||
导入任务当前所处的阶段。在 Broker load 导入过程中主要会出现 PENDING 和 LOADING 这两个导入中的状态。如果 Broker load 处于 PENDING 状态,则说明当前导入任务正在等待被执行, LOADING 状态则表示正在执行中( transform and loading data)。
|
||||
|
||||
导入任务的最终阶段有两个: CANCELLED 和 FINISHED,当 Load job 处于这两个阶段时,导入完成。其中 CANCELLED 为导入失败,FINISHED 为导入成功。
|
||||
|
||||
+ Progress
|
||||
|
||||
导入任务的进度描述。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。目前 Broker load 由于只有 LOADING 阶段,所以 ETL 则会永远显示为 ```N/A ```
|
||||
|
||||
LOAD 的进度范围为:0~100%。``` LOAD 进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%```
|
||||
|
||||
**如果所有导入表均完成导入,此时 LOAD 的进度为 99%** 导入进入到最后生效阶段,整个导入完成后,LOAD 的进度才会改为 100%。
|
||||
|
||||
+ Type
|
||||
|
||||
导入任务的类型。Broker load 的 type 取值只有 BROKER。
|
||||
+ EtlInfo
|
||||
|
||||
主要显示了导入的数据量指标 ``` dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的 filter ratio 是否超过 max\_filter\_ratio.
|
||||
|
||||
+ TaskInfo
|
||||
|
||||
主要显示了当前导入任务参数,也就是创建 Broker load 导入任务时用户指定的导入任务参数,包括:cluster,timeout和max\_filter\_ratio。
|
||||
|
||||
+ ErrorMsg
|
||||
|
||||
在导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type和msg,如果导入任务成功则显示 ```N/A```。
|
||||
|
||||
```
|
||||
type的取值意义
|
||||
USER_CANCEL: 用户取消的任务
|
||||
ETL_RUN_FAIL:在ETL阶段失败的导入任务
|
||||
ETL_QUALITY_UNSATISFIED:数据质量不合格,也就是错误数据率超过了 max_filter_ratio
|
||||
LOAD_RUN_FAIL:在LOADING阶段失败的导入任务
|
||||
TIMEOUT:导入任务没在超时时间内完成
|
||||
UNKNOWN:未知的导入错误
|
||||
|
||||
```
|
||||
|
||||
|
||||
+ CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime
|
||||
|
||||
这几个值分别代表导入创建的时间,ETL阶段开始的时间,ETL阶段完成的时间,Loading阶段开始的时间和整个导入任务完成的时间。
|
||||
|
||||
Broker load 导入由于没有 ETL 阶段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被设置为同一个值。
|
||||
|
||||
导入任务长时间停留在CreateTime,而 LoadStartTime 为 N/A 则说明目前导入任务堆积严重。用户可减少导入提交的频率。
|
||||
|
||||
```
|
||||
LoadFinishTime - CreateTime = 整个导入任务所消耗时间
|
||||
LoadFinishTime - LoadStartTime = 整个 Broker load 导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间
|
||||
```
|
||||
|
||||
+ URL
|
||||
|
||||
导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。
|
||||
|
||||
## 取消导入
|
||||
|
||||
Broker load 可以被用户手动取消,取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 ``` HELP CANCEL LOAD ```查看。
|
||||
|
||||
# Broker load 导入系统配置
|
||||
## FE conf 中的配置
|
||||
|
||||
下面几个配置属于 Broker load 的系统级别配置,也就是作用于所有 Broker load 导入任务的配置。主要通过修改 ``` fe.conf```来调整配置值。
|
||||
|
||||
+ min\_bytes\_per\_broker\_scanner,max\_bytes\_per\_broker\_scanner 和 max\_broker\_concurrency
|
||||
|
||||
前两个配置限制了单个 BE 处理的数据量的最小和最大值。第三个配置限制了最大的导入并发数。最小处理的数据量,最大并发数,源文件的大小和当前集群 BE 的个数**共同决定了本次导入的并发数**。
|
||||
|
||||
```本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)```
|
||||
```本次导入单个BE的处理量 = 源文件大小/本次导入的并发数```
|
||||
|
||||
如果,``` 本次导入单个BE的处理量 > 最大处理的数据量```则导入任务会被取消。一般来说,导入的文件过大可能导致这个问题,通过上述公式,预先计算出单个 BE 的预计处理量并调整最大处理量即可解决问题。
|
||||
|
||||
```
|
||||
默认配置:
|
||||
参数名:min_bytes_per_broker_scanner, default 64MB,单位bytes
|
||||
参数名:max_broker_concurrency, default 10
|
||||
参数名:max_bytes_per_broker_scanner,default 3G,单位bytes
|
||||
|
||||
```
|
||||
|
||||
# Broker load 最佳实践
|
||||
|
||||
## 应用场景
|
||||
使用 Broker load 最适合的场景就是原始数据在文件系统(HDFS,BOS,AFS)中的场景。其次,由于 Broker load 是单次导入中唯一的一种异步导入的方式,所以如果用户在导入大文件中,需要使用异步接入,也可以考虑使用 Broker load。
|
||||
|
||||
## 数据量
|
||||
|
||||
这里仅讨论单个 BE 的情况,如果用户集群有多个 BE 则下面标题中的数据量应该乘以 BE 个数来计算。比如:如果用户有3个 BE,则 3G 以下(包含)则应该乘以 3,也就是 9G 以下(包含)。
|
||||
|
||||
+ 3G 以下(包含)
|
||||
|
||||
用户可以直接提交 Broker load 创建导入请求。
|
||||
|
||||
+ 3G 以上
|
||||
|
||||
由于单个导入 BE 最大的处理量为 3G,超过 3G 的待导入文件就需要通过调整 Broker load 的导入参数来实现大文件的导入。
|
||||
|
||||
1. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。
|
||||
|
||||
```
|
||||
修改 fe.conf 中配置
|
||||
|
||||
max_broker_concurrency = BE 个数
|
||||
当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency
|
||||
max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量
|
||||
|
||||
比如一个 100G 的文件,集群的 BE 个数为 10个
|
||||
max_broker_concurrency = 10
|
||||
max_bytes_per_broker_scanner >= 10G = 100G / 10
|
||||
|
||||
```
|
||||
|
||||
修改后,所有的 BE 会并发的处理导入任务,每个 BE 处理原始文件的一部分。
|
||||
|
||||
*注意:上述两个 FE 中的配置均为系统配置,也就是说其修改是作用于所有的 Broker load的任务的。*
|
||||
|
||||
2. 在创建导入的时候自定义当前导入任务的 timeout 时间
|
||||
|
||||
```
|
||||
当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s
|
||||
|
||||
比如一个 100G 的文件,集群的 BE 个数为 10个
|
||||
timeout >= 1000s = 10G / 10M/s
|
||||
|
||||
```
|
||||
|
||||
3. 当用户发现第二步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时
|
||||
|
||||
这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。
|
||||
|
||||
可以通过如下公式计算出 Doris 集群期望最大导入文件数据量:
|
||||
|
||||
```
|
||||
期望最大导入文件数据量 = 14400s * 10M/s * BE 个数
|
||||
比如:集群的 BE 个数为 10个
|
||||
期望最大导入文件数据量 = 14400 * 10M/s * 10 = 1440000M ≈ 1440G
|
||||
|
||||
注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
|
||||
|
||||
```
|
||||
|
||||
## 完整例子
|
||||
|
||||
数据情况:用户数据在 HDFS 中,文件地址为 hdfs://abc.com:8888/store_sales, hdfs 的认证用户名为 root, 密码为 password, 数据量大小约为 30G,希望导入到数据库 bj_sales 的表 store_sales 中。
|
||||
|
||||
集群情况:集群的 BE 个数约为 3 个,集群中有 3 个 Broker 名称均为 broker。
|
||||
|
||||
+ step1: 经过上述方法的计算,本次导入的单个 BE 导入量为 10G,则需要先修改 FE 的配置,将单个 BE 导入最大量修改为:
|
||||
|
||||
```
|
||||
max_bytes_per_broker_scanner = 100000000000
|
||||
|
||||
```
|
||||
|
||||
+ step2: 经计算,本次导入的时间大约为 1000s,并未超过默认超时时间,可不配置导入自定义超时时间。
|
||||
|
||||
+ step3:创建导入语句
|
||||
|
||||
```
|
||||
load label store_sales_broker_load_01.bj_sales
|
||||
(DATA INFILE("hdfs://abc.com:8888/store_sales") into table store_sales
|
||||
with broker 'broker' ("username"="root", "password"="password")
|
||||
```
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,128 @@
|
||||
# Insert into
|
||||
|
||||
Insert into 主要用来将某几行数据插入某张表,或者将已经存在在 Doris 系统中的部分数据导入到目标表中。
|
||||
|
||||
目前所有的 Insert into 导入实现均为同步执行。但是为了兼容旧的 Insert into 导入,不指定 STREAMING 的 Insert into 同步执行完后,依然可以通过查看导入命令查看导入状态和结果。
|
||||
|
||||
# 基本操作
|
||||
## 创建导入
|
||||
|
||||
Insert into 创建导入请求需要通过 Mysql 协议提交,创建导入请求会同步返回导入结果。
|
||||
|
||||
Insert info 创建导入语句
|
||||
|
||||
```
|
||||
语法:
|
||||
INSERT INTO table_name [partition_info] [col_list] [plan_hints] [query_stmt] [VALUES]
|
||||
|
||||
示例:
|
||||
insert into char_4_new values ("qweasdzxcqweasdzxc"), ("a");
|
||||
insert into char_4_new select * from char_3_new;
|
||||
```
|
||||
|
||||
下面主要介绍创建导入语句中使用到的参数:
|
||||
|
||||
+ query\_stmt
|
||||
|
||||
通过一个查询语句,将查询语句的结果导入到 Doris 系统中的其他表。
|
||||
|
||||
+ VALUES
|
||||
|
||||
用户可以通过 VALUES 语法插入一条或者多条数据。
|
||||
|
||||
*注意:VALUES 方式仅适用于导入几条数据作为导入 DEMO 的情况,完全不适用于任何测试和生产环境。Doris 系统本身也不适合单条数据导入的场景。建议使用 Insert into select 的方式进行批量导入。*
|
||||
|
||||
+ partition\_info
|
||||
|
||||
导入表的目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。所有不属于当前目标分区的数据均属于 ``` num_rows_unselected ```,不参与 ``` max_filter_ratio ``` 的计算。
|
||||
|
||||
+ col\_list
|
||||
|
||||
导入表的目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。如果待表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 INSERT 就会执行失败。
|
||||
|
||||
如果查询语句的结果列类型与目标列的类型不一致,那么会调用隐式类型转化,如果不能够进行转化,那么 INSERT 语句会报语法解析错误。
|
||||
|
||||
+ plan\_hints
|
||||
|
||||
可指定当前导入的导入方式,目前无论是否指定 hint 导入方式均为同步,用户均可以通过创建导入请求的结果判断导入结果。
|
||||
|
||||
唯一不同的是,不指定 STREAMING 的导入方式,由于为了兼容旧的使用习惯,用户依旧可以通过查看导入命令看到导入的结果。但导入实际上已经在创建导入返回结果的时候就已经完成了。
|
||||
|
||||
## 导入结果
|
||||
|
||||
Insert into 的导入结果也就是创建导入命令的返回值。
|
||||
|
||||
指定 plan\_hints 为 STREAMING 的 Insert into 导入直接根据 sql 执行的返回码确定导入是否成功,除了 query 返回 OK 以外,其他均为导入失败。
|
||||
|
||||
未指定 plan\_hints 的 Insert into 导入如果导入失败,也是返回 sql 执行失败,如果导入成功还会附加返回一个 Label 字段。
|
||||
|
||||
+ Label
|
||||
|
||||
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。Insert into 的 label 则是由系统生成的,用户可以拿着这个 label 通过查询导入命令异步获取导入状态。
|
||||
|
||||
*注意:只有不指定 plan\_hints 的 Insert into 才会返回 Label 参数*
|
||||
|
||||
# Insert into 系统配置
|
||||
## FE conf 中的配置
|
||||
|
||||
+ timeout
|
||||
|
||||
导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。
|
||||
|
||||
目前 Insert into 并不支持自定义导入的 timeout 时间,所有 Insert into 导入的超时时间是统一的,默认的 timeout 时间为1小时。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```insert_load_default_timeout_second```。
|
||||
|
||||
## Session variable 中的配置
|
||||
|
||||
+ enable\_insert\_strict
|
||||
|
||||
Insert into 导入本身不能设置 ```max_filter_ratio ```,这个参数用来代替错误率限制的功能,可以通过 ``` session variable ``` 设置,默认值为 false。当该参数设置为 false 时,则和```max_filter_ratio = 1 ``` 意义相同。如果该参数设置为 true 时,则和```max_filter_ratio = 0 ``` 意义相同。
|
||||
|
||||
# Insert into 最佳实践
|
||||
## 应用场景
|
||||
1. 用户希望仅导入几条假数据,验证一下 Doris 系统的功能。此时适合使用 Insert into VALUS 的语法。
|
||||
2. 用户希望将已经在 Doris 系统中的数据进行 ETL 转换并导入到一个新的 Doris 表中,此时适合使用 Insert into query\_stmt。
|
||||
|
||||
除以上两种情况外,其他情况并不适合使用 Insert into
|
||||
|
||||
## 数据量
|
||||
Insert into 对数据量没有限制,大数据量导入也可以支持。但 Insert into 有默认的超时时间,用户预估的导入数据量过大,就需要修改系统的 Insert into 导入超时时间。
|
||||
|
||||
```
|
||||
导入数据量 = 36G 约≤ 3600s * 10M/s
|
||||
其中 10M/s 是最大导入限速,用户需要根据当前集群情况计算出平均的导入速度来替换公式中的 10M/s
|
||||
```
|
||||
|
||||
## 完成例子
|
||||
用户有一张表 store_sales 在数据库 sales 中,用户又创建了一张表叫 bj_store_sales 也在数据库 sales 中,用户希望将 store_sales 中销售记录在 bj 的数据导入到这张新建的表 bj_store_sales 中。导入的数据量约为:10G。
|
||||
|
||||
```
|
||||
store_sales schema:
|
||||
(id, total, user_id, sale_timestamp, region)
|
||||
|
||||
bj_store_sales schema:
|
||||
(id, total, user_id, sale_timestamp)
|
||||
|
||||
```
|
||||
|
||||
集群情况:用户当前集群的平均导入速度约为 5M/s
|
||||
|
||||
+ Step1: 判断是否要修改 Insert into 的默认超时时间
|
||||
|
||||
```
|
||||
计算导入的大概时间
|
||||
10G / 5M/s = 2000s
|
||||
|
||||
修改 FE 配置
|
||||
insert_load_default_timeout_second = 2000
|
||||
```
|
||||
|
||||
+ Step2:创建导入任务
|
||||
|
||||
由于用户是希望将一张表中的数据做 ETL 并导入到目标表中,所以应该使用 Insert into query\_stmt 方式导入。
|
||||
|
||||
```
|
||||
insert into bj_store_sales select id, total, user_id, sale_timestamp from store_sales where region = "bj";
|
||||
```
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,176 @@
|
||||
# 导入
|
||||
导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过 Mysql 客户端查询数据。
|
||||
|
||||
本文档主要总体介绍导入简单原理,目前支持的几种导入方式,以及最佳实践。具体各个导入的操作手册请参阅各个导入的文档。
|
||||
|
||||
# 基本概念
|
||||
|
||||
1. 导入任务(Load job):导入任务读取用户提交的源数据,转换或清洗后,将数据导入到 Doris 系统中。导入完成后,数据即可被用户查询到。
|
||||
2. Backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。
|
||||
3. Frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入规划生成和导入任务的调度工作。
|
||||
4. Palo:Doris 系统在百度云上企业版售卖的名称。
|
||||
5. HDFS: Hadoop 分布式文件系统。用户的待导入数据可能存放的位置。
|
||||
6. BOS:百度云提供的对象存储系统,百度云上的 Palo 用户的待导入数据可能存放的位置。
|
||||
7. AFS:百度内部提供的超大规模文件系统,百度厂内的 Palo 用户的待导入数据可能存放的位置。
|
||||
8. Database(DB):Doris 系统中的 database。
|
||||
|
||||
# 基本原理
|
||||
## 导入执行流程
|
||||
|
||||
```
|
||||
+---------+ +---------+ +----------+ +-----------+
|
||||
| | | | | | | |
|
||||
|PENDING +----->+ETL +----->+LOADING +----->+FINISHED |
|
||||
| | | | | | | |
|
||||
+---------+ +---+-----+ +----+-----+ +-----------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| | +-----------+
|
||||
| | | |
|
||||
+-----------------+------------>CANCELLED |
|
||||
| |
|
||||
+-----------+
|
||||
|
||||
```
|
||||
|
||||
如上图,导入流程主要经过上面4个阶段。其中 PENDING 和 ETL 阶段不是必须的阶段。
|
||||
|
||||
+ PENDING: 该阶段只有 Broker load 才有。Broker load 被用户提交后会短暂停留在这个阶段,直到被 FE 中的 Scheduler 调度。 其中 Scheduler 的调度间隔为5秒。
|
||||
|
||||
+ ETL: 该阶段在版本0.11.0(包含)之前存在,主要是用于将原始数据按照用户声明的 transform 方式进行变换,并且过滤不满足条件的原始数据。在 0.11.0 后的版本,ETL 阶段不再存在,其中数据 transform 的工作被合并到 LOADING 阶段。
|
||||
|
||||
|
||||
+ LOADING: 该阶段在版本0.11.0(包含)之前主要用于将变换后的数据推到对应的 BE 存储中。在0.11.0后的版本,该阶段先对数据进行 ETL 清洗和变换,然后将数据发送到 BE 存储中。当所有导入数据均完成导入后,进入等待生效过程,此时 Load job 依旧是 LOADING。
|
||||
|
||||
|
||||
+ FINISHED: 在 Load job 涉及的所有数据均生效后,Load job 的状态变成 FINISHED。FINISHED 后导入的数据均可查询。
|
||||
|
||||
|
||||
+ CANCELLED: 在 ETL 或者 LOADING 的过程中,如果发生数据质量不合格等问题,Load Job 会被系统 cancel,本次导入会全部失败。当然,用户也可以手动取消 Load Job。CANCELLED 也是 Load Job 的最终状态,不可被再次执行。
|
||||
|
||||
|
||||
上述阶段,除了 PENDING 到 LOADING 阶段是 Scheduler 轮训调度的,其他阶段之前的转移都是回调机制实现。
|
||||
|
||||
# 导入方式
|
||||
|
||||
为适配不同的数据导入需求, Doris 系统提供了5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 数据格式。
|
||||
|
||||
每个导入方式的说明见单个导入方式的操作手册。此处只简单介绍并讲解如何选择。
|
||||
|
||||
1. Broker load: 是一种异步类型的导入方式,主要支持数据源有 HDFS, BOS, AFS 这三种。用户可通过 mysql client 创建导入。
|
||||
2. Stream load:是一种同步类型的导入方式,用户提交 HTTP 请求并携带原始数据创建导入。
|
||||
+ Mini load:Mini load 和 Stream load 一样,但从功能上并不支持列的函数变换等。建议直接使用 Stream load 即可。
|
||||
3. Insert into: 是一种同步类型的导入方式,用户可以指定将已经存在在 Doris 系统中的数据导入到另一个 Table 中。用户可通过 Mysql client 创建导入。
|
||||
4. Multi load:是一种异步类型的导入方式,在导入创建后可接收多次 Mini load 导入,并且原子性生效同一个 Multi load中的导入数据。
|
||||
5. Routine load:流式导入方式,用户指定流式数据源后,Doris 会循环不断从数据源中读取新数据并导入。目前支持的流式数据源为 Kafka。
|
||||
|
||||
## 如何选择合适的导入方式
|
||||
第一原则:根据数据源所在位置选择导入方式。比如:如果用户的原始数据存放在 HDFS 上,则应该选择 Broker load 导入方式。
|
||||
|
||||
# 基本操作
|
||||
这里主要介绍了导入的几种基本操作,以及基本操作的使用流程。用户在确定了导入方式后,可查看各个导入方式的操作手册获取更详细的操作参数意义。
|
||||
|
||||
## 创建导入
|
||||
用户提交导入任务,并且设置导入任务的参数。用户提交导入任务后,根据不同的导入方式,导入任务会被异步或同步的执行。
|
||||
|
||||
同步执行的导入比如:Stream load,任务会直接进入LOADING阶段。异步执行的导入方式比如: Broker load,任务会先进入PENDING阶段。(在导入操作使用流程中会详细解释同步和异步的区别)
|
||||
|
||||
## 查看导入
|
||||
用户提交成功导入后,可通过查看导入命令获取导入状态,以及详细信息。对于异步的导入方式来说,用户必须在提交导入后,异步查看导入进度。在 SHOW LOAD 中导入状态改为 FINISHED 时,导入才算完成。用户也可以通过该方式查看导入失败的原因,以及导入的各项指标。
|
||||
|
||||
执行 ``` HELP SHOW LOAD``` 可查看查看导入的语法帮助。
|
||||
|
||||
## 取消导入
|
||||
用户可以通过指定待取消的任务任务的 Label,来手动取消导入任务。导入任务被取消后,导入任务不会导入任何数据。只有未完成的导入任务才能被手动取消。
|
||||
|
||||
执行 ```HELP CANCEL LOAD``` 查看取消导入的语法。
|
||||
|
||||
一般情况下,提交的导入配置错误或者导入消耗时间过长,都可以使用这个命令取消导入。
|
||||
|
||||
## 导入操作使用流程
|
||||
Doris 目前的导入方式分为两类,同步和异步。如果是外部程序接入 Doris 的导入功能,需要判断使用导入方式是哪类再确定接入逻辑。
|
||||
|
||||
### 同步
|
||||
同步导入方式既用户创建导入任务,Doris 同步执行导入,执行完成后返回用户导入结果。用户可直接根据创建导入任务命令返回的结果同步判断导入是否成功。
|
||||
|
||||
同步类型的导入方式有: Stream load , Mini load, Insert into。
|
||||
|
||||
操作步骤:
|
||||
|
||||
1. 用户(外部系统)创建导入任务
|
||||
2. Doris 返回导入结果
|
||||
3. 用户(外部系统)判断导入结果,如果失败可以再次提交导入任务。
|
||||
|
||||
*注意:如果用户使用的导入方式是同步返回的,且导入的数据量过大,则创建导入请求可能会花很长时间才能返回结果。*
|
||||
|
||||
### 异步
|
||||
异步导入方式既用户创建导入任务后,Doris 直接返回创建成功。导入任务会被异步执行,用户需要再发查看导入请求来确定导入是否完成。
|
||||
|
||||
异步类型的导入方式有:Broker load,Multi load。
|
||||
|
||||
操作步骤:
|
||||
|
||||
1. 用户(外部系统)创建导入任务
|
||||
2. Doris 返回导入创建结果
|
||||
3. 用户(外部系统)判断导入创建结果,成功则进入4,失败回到重试创建导入,回到1。
|
||||
4. 用户(外部系统)轮训查看导入任务,直到 state 变为 FINISHED 或 CANCELLED。
|
||||
|
||||
**强调:异步类型的导入,必须通过异步查看导入任务来确定导入是否成功。仅仅创建导入成功,并不代表导入完成**
|
||||
|
||||
### 注意事项
|
||||
无论是异步还是同步的导入类型,都不应该在 Doris 返回导入失败或导入创建失败后,无休止的重试。**外部系统在有限次数重试并失败后,保留失败信息,大部分多次重试均失败问题都是使用方法问题或数据本身问题。**
|
||||
|
||||
# 最佳实践
|
||||
用户在接入 Doris 导入时,一般会采用程序接入的方式,这样可以保证数据被定期的导入到 Doris 中。下面主要说明了程序接入 Doris 的最佳实践。
|
||||
|
||||
1. 选择合适的导入方式:根据数据源所在位置选择导入方式。例如:如果原始数据存放在 BOS 或 HDFS 上,则使用 Broker load导入。
|
||||
2. 确定导入方式的协议:如果选择了 Broker load 导入方式,则外部系统需要能使用 Mysql 协议定期提交任务。
|
||||
3. 确定导入方式的类型:导入方式为同步或异步。比如 Broker load 为异步导入方式,则外部系统在提交创建导入后,必须调用 查看导入命令,根据查看导入命令的结果来判断导入是否成功。
|
||||
4. 制定 label 生成策略:label 生成策略需满足,每一批次数据唯一且固定的原则。这样 Doris 就可以保证 at most once。
|
||||
5. 程序自身保证 at least once:外部系统需要保证自身的 at least once,这样就可以导入流程的 exactly once。
|
||||
|
||||
# 导入通用系统配置
|
||||
下面主要解释了几个所有导入方式均通用的系统级别的配置。
|
||||
|
||||
## FE conf 中的配置
|
||||
下面两个配置属于 FE 的系统配置,可以通过修改 FE 的配置文件 ```fe.conf``` 来修改配置。
|
||||
|
||||
+ max\_load\_timeout\_second 和 min\_load\_timeout\_second
|
||||
|
||||
这两个配置含义为:最大的导入超时时间,最小的导入超时时间,以秒为单位。默认的最大超时时间为3天, 默认的最小超时时间为1秒。用户自定义的导入超时时间不可超过这个范围。该参数通用于所有的导入方式。
|
||||
|
||||
该配置对应的 ```fe.conf```中的参数名为 ```max_load_timeout_second, min_load_timeout_second ```。
|
||||
|
||||
+ desired\_max\_waiting\_jobs
|
||||
|
||||
在等待队列中的导入任务个数最大值,默认为100。当在 FE 中处于 PENDING 状态(也就是等待执行的)导入个数超过该值,新的导入请求则会被拒绝。
|
||||
|
||||
此配置仅对异步执行的导入有效,当异步执行的导入等待个数超过默认值,则会在创建导入请求的时候被拒绝。
|
||||
|
||||
该配置对应的 ```fe.conf```中的参数名为 ```desired_max_waiting_jobs```。
|
||||
|
||||
+ max\_running\_txn\_num\_per\_db
|
||||
|
||||
这个配置的含义是说,每个 DB 中正在运行的导入最大个数(不区分导入类型,统一计数)。当当前 DB 正在运行的导入个数超过最大值时,后续的导入则会直接被系统取消。
|
||||
|
||||
对于同步执行的导入来说,如果当前 DB 的正在运行导入个数超过最大值,则创建导入请求就会返回失败。
|
||||
|
||||
对于异步执行的导入来说,如果当前 DB 的正在运行导入个数超过最大值,则会在导入被正式调度到(也就是从 PENDING 状态准备改为 LOADING 状态)的时候被系统异步取消。
|
||||
|
||||
该配置对应的是 ```fe.conf``` 中的参数名为 ``` max_running_txn_num_per_db ```。
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,224 @@
|
||||
# Stream load
|
||||
Stream load 是一个同步的导入方式,用户通过发送 HTTP 请求将本地或内存中文件导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
|
||||
|
||||
Stream load 主要适用于待导入文件在发送导入请求端,或可被发送导入请求端获取的情况。
|
||||
|
||||
# 基本原理
|
||||
Stream load 是单机版本的导入,整个导入过程中参与 ETL 的节点只有一个 BE。由于导入后端是流式的所以并不会受单个 BE 的内存限制影响。
|
||||
|
||||
下图展示了 Stream load 的主要流程,省略了一些导入细节。
|
||||
|
||||
```
|
||||
^ +
|
||||
| |
|
||||
| | user creates stream load
|
||||
| 5. BE returns |
|
||||
| the result of v
|
||||
| stream load +---+-----+
|
||||
| | |
|
||||
| |FE |
|
||||
| | |
|
||||
| ++-+------+
|
||||
| 1.FE redirects| ^ 2. BE fetches the plan
|
||||
| request to be | | of stream load
|
||||
| | | ^
|
||||
| | | | 4. BE wants to effect
|
||||
| v | | the load data
|
||||
| ++-+---+--+
|
||||
| | +------+
|
||||
+--------------+BE | | 3. BE executes plan
|
||||
| +<-----+
|
||||
+---------+
|
||||
```
|
||||
|
||||
# 基本操作
|
||||
## 创建导入
|
||||
Stream load 创建导入语句
|
||||
|
||||
```
|
||||
语法:
|
||||
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
|
||||
|
||||
Header 中支持如下属性:
|
||||
label, column_separator, columns, where, max_filter_ratio, partitions
|
||||
格式为: -H "key1:value1"
|
||||
|
||||
示例:
|
||||
|
||||
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8888/api/test/date/_stream_load
|
||||
|
||||
```
|
||||
创建导入的详细语法帮助执行 ``` HELP STREAM LOAD;``` 查看, 下面主要介绍创建 Stream load 的部分参数意义。
|
||||
|
||||
### 签名参数
|
||||
|
||||
+ user/passwd
|
||||
|
||||
Stream load 由于创建导入的协议使用的是 HTTP 协议,所以需要通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
|
||||
|
||||
### 导入任务参数
|
||||
|
||||
Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。
|
||||
|
||||
+ label
|
||||
|
||||
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
|
||||
|
||||
label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At most once**
|
||||
|
||||
当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。
|
||||
|
||||
+ max\_filter\_ratio
|
||||
|
||||
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的 filter ratio 超过该值,则导入失败。计算公式为: ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )> max_filter_ratio ```
|
||||
|
||||
``` dpp.abnorm.ALL ``` 在代码中也叫 ``` num_rows_filtered``` 指的是导入过程中被过滤的错误数据。比如:列在表中为非空列,但是导入数据为空则为错误数据。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的错误数据量。
|
||||
|
||||
``` dpp.norm.ALL ``` 指的是导入过程中正确数据的条数。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的正确数据量。
|
||||
|
||||
``` num_rows_unselected ``` 导入过程中被 where 条件过滤掉的数据量。过滤的数据量不参与容忍率的计算。
|
||||
|
||||
``` 原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL + num_rows_unselected ```
|
||||
|
||||
+ where
|
||||
|
||||
导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参数 filter ratio 的计算,但会被计入``` num_rows_unselected ```。
|
||||
|
||||
+ partition
|
||||
|
||||
待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 ```dpp.abnorm.ALL ```
|
||||
|
||||
+ columns
|
||||
|
||||
待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
|
||||
|
||||
```
|
||||
列顺序变换例子:原始数据有两列,目前表也有两列(c1,c2)但是原始文件的第一列对应的是目标表的c2列, 而原始文件的第二列对应的是目标表的c1列,则写法如下:
|
||||
columns: c2,c1
|
||||
|
||||
表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:
|
||||
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2)
|
||||
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
|
||||
|
||||
```
|
||||
|
||||
### 创建导入操作返回结果
|
||||
|
||||
由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
|
||||
|
||||
下面主要解释了 Stream load 导入结果参数:
|
||||
|
||||
+ NumberLoadedRows
|
||||
|
||||
和```dpp.norm.ALL ``` 含义相同,显示的是本次导入成功的条数。
|
||||
|
||||
+ NumberFilteredRows
|
||||
|
||||
和 ```dpp.abnorm.ALL ``` 含义相同,显示的是本次导入被过滤的错误条数。
|
||||
|
||||
+ NumberUnselectedRows
|
||||
|
||||
显示的是本地导入被 where 条件过滤的条数。
|
||||
|
||||
|
||||
*注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。
|
||||
|
||||
## 取消导入
|
||||
用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。
|
||||
|
||||
# Stream load 导入系统配置
|
||||
## FE conf 中的配置
|
||||
|
||||
+ timeout
|
||||
|
||||
导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。
|
||||
|
||||
目前 Stream load 并不支持自定义导入的 timeout 时间,所有 Stream load 导入的超时时间是统一的,默认的 timeout 时间为300秒。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```stream_load_default_timeout_second```。
|
||||
|
||||
## BE conf 中的配置
|
||||
|
||||
+ streaming\_load\_max\_mb
|
||||
|
||||
Stream load 的最大导入大小,默认为 10G,单位是 Mb。如果用户的原始文件超过这个值,则需要调整 BE 的参数 ```streaming_load_max_mb```。
|
||||
|
||||
# Stream load 最佳实践
|
||||
## 应用场景
|
||||
使用 stream load 的最合适场景就是原始文件在内存中,或者在磁盘中。其次,由于 Stream load 是一种同步的导入方式,所以用户如果希望用同步方式获取导入结果,也可以使用这种导入。
|
||||
|
||||
## 数据量
|
||||
由于 Stream load 的原理是由 BE 发起的导入,所以 Stream load 都是单机执行的,导入数据量在1 Byte ~ 20G 均可。由于默认的最大 Stream load 导入数据量为 10G,所以如果要导入超过 10G 的文件需要修改 BE 的配置 ```streaming_load_max_mb```
|
||||
|
||||
```
|
||||
比如:待导入文件大小为15G
|
||||
修改 BE 配置 streaming_load_max_mb 为 16000 即可。
|
||||
```
|
||||
|
||||
+ 数据量过大导致超时
|
||||
|
||||
Stream load 的默认超时为 300秒,按照 Doris 目前最大的导入限速来看,约超过 3G 的文件就需要修改导入任务默认超时时间了。
|
||||
|
||||
```
|
||||
导入任务超时时间 = 导入数据量 / 10M/s (具体的平均导入速度需要用户根据自己的集群情况计算)
|
||||
例如:导入一个 10G 的文件
|
||||
timeout = 1000s 等于 10G / 10M/s
|
||||
```
|
||||
|
||||
## 完整例子
|
||||
数据情况: 数据在发送导入请求端的本地磁盘路径 /home/store_sales 中,导入的数据量约为 15G,希望导入到数据库 bj_sales 的表 store_sales 中。
|
||||
|
||||
集群情况:Stream load 的并发数不受集群大小影响。
|
||||
|
||||
+ step1: 导入文件大小是否超过默认的最大导入大小10G
|
||||
|
||||
```
|
||||
修改 BE conf
|
||||
streaming_load_max_mb = 16000
|
||||
```
|
||||
+ step2: 计算大概的导入时间是否超过默认 timeout 值
|
||||
|
||||
```
|
||||
导入时间 ≈ 15000 / 10 = 1500s
|
||||
超过了默认的 timeout 时间,需要修改 FE 的配置
|
||||
stream_load_default_timeout_second = 1500
|
||||
|
||||
```
|
||||
|
||||
+ step3:创建导入任务
|
||||
|
||||
```
|
||||
curl --location-trusted -u root:password -T /home/store_sales -H "label:abc" http://abc.com:8000/api/bj_sales/store_sales/_stream_load
|
||||
```
|
||||
|
||||
|
||||
# 常见问题
|
||||
|
||||
## Label Already Exists
|
||||
|
||||
Stream load 的 Label 重复排查步骤如下:
|
||||
|
||||
1. 是否和其他导入方式已经存在的导入 Label 冲突:
|
||||
|
||||
由于 Doris 系统中导入的 Label 不区分导入方式,所以存在其他导入方式使用了相同 Label 的问题。
|
||||
|
||||
通过 ``` SHOW LOAD WHERE LABEL = “xxx”```,其中 xxx 为重复的 Label 字符串,查看是否已经存在一个 FINISHED 导入的 Label 和用户申请创建的 Label 相同。
|
||||
|
||||
2. 是否 Stream load 同一个创建任务被重复提交了
|
||||
|
||||
由于 Stream load 是 HTTP 协议提交创建导入任务,一般各个语言的 HTTP Client 均会自带请求重试逻辑。Doris 系统在接受到第一个请求后,已经开始操作 Stream load,但是由于没有及时返回给 Client 端结果, Client 端会发生再次重试创建请求的情况。这时候 Doris 系统由于已经在操作第一个请求,所以第二个请求已经就会被报 Label Already Exists 的情况。
|
||||
|
||||
排查上述可能的方法:使用 Label 搜索 FE Master 的日志,看是否存在同一个 Label 出现了两次 ```redirect load action to destination= ``` 的情况。如果有就说明,请求被 Client 端重复提交了。
|
||||
|
||||
改进方法:
|
||||
+ 建议用户根据当前请求的数据量,计算出大致导入的时间,并根据导入超时时间改大 Client 端的请求超时时间,避免请求被 Client 端多次提交。
|
||||
|
||||
# Mini load
|
||||
|
||||
Mini load 支持的数据源,导入方式以及支持的 ETL 功能是 Stream load 的子集,且导入的效率和 Stream load 一致。**强烈建议使用 Stream load 替代 Mini load**
|
||||
|
||||
唯一不同的是,Mini load 由于为了兼容旧版本的使用习惯,用户不仅可以根据创建请求的返回值查看导入状态,也可以异步的通过查看导入命令查看导入状态。
|
||||
|
||||
Mini load的创建语法帮助可执行 ``` HELP MINI LOAD```。
|
||||
|
||||
|
||||
|
||||
|
||||
@ -10,18 +10,6 @@ INSERT INTO table_name
|
||||
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
|
||||
```
|
||||
|
||||
## Description
|
||||
|
||||
INSERT 向一张表里插入数据。用户可以通过 VALUES 语法插入一条或者多条数据,或者通过一个查询来插入0条或者多条数据。
|
||||
|
||||
partition是目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。
|
||||
|
||||
column是目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。
|
||||
|
||||
如果表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 INSERT 就会执行失败。
|
||||
|
||||
如果表达式的类型与目标列的类型不一致,那么会调用隐式类型转化,如果不能够进行转化,那么 INSERT 语句会报语法解析错误。
|
||||
|
||||
## Parameters
|
||||
|
||||
> tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
## description
|
||||
|
||||
Broker load 通过随 Palo 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
|
||||
不同的数据源需要部署不同的 broker 进程。可以通过 show broker 命令查看已经部署的 broker。
|
||||
可以通过 show broker 命令查看已经部署的 broker。
|
||||
目前支持以下4种数据源:
|
||||
|
||||
1. Baidu HDFS:百度内部的 hdfs,仅限于百度内部使用。
|
||||
@ -110,7 +110,7 @@
|
||||
|
||||
3. broker_name
|
||||
|
||||
所使用的 broker 名称,可以通过 show broker 命令查看。不同的数据源需使用对应的 broker。
|
||||
所使用的 broker 名称,可以通过 show broker 命令查看。
|
||||
|
||||
4. broker_properties
|
||||
|
||||
@ -161,6 +161,7 @@
|
||||
max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。
|
||||
exec_mem_limit: 设置导入使用的内存上限。默认为2G,单位字节。这里是指单个 BE 节点的内存上限。
|
||||
一个导入可能分布于多个BE。我们假设 1GB 数据在单个节点处理需要最大5GB内存。那么假设1GB文件分布在2个节点处理,那么理论上,每个节点需要内存为2.5GB。则该参数可以设置为 2684354560,即2.5GB
|
||||
strict mode: 是否对数据进行严格限制。默认为true。
|
||||
|
||||
5. 导入数据格式样例
|
||||
|
||||
|
||||
@ -336,10 +336,10 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int load_running_job_num_limit = 0; // 0 is no limit
|
||||
/*
|
||||
* Default pull load timeout
|
||||
* Default broker load timeout
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int pull_load_task_default_timeout_second = 14400; // 4 hour
|
||||
public static int broker_load_default_timeout_second = 14400; // 4 hour
|
||||
|
||||
/*
|
||||
* Default non-streaming mini load timeout
|
||||
@ -361,16 +361,16 @@ public class Config extends ConfigBase {
|
||||
public static int stream_load_default_timeout_second = 600; // 300s
|
||||
|
||||
/*
|
||||
* Max stream load timeout
|
||||
* Max load timeout applicable to all type of load
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int max_stream_load_timeout_second = 259200; // 3days
|
||||
public static int max_load_timeout_second = 259200; // 3days
|
||||
|
||||
/*
|
||||
* Min stream load timeout
|
||||
* Min stream load timeout applicable to all type of load
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int min_stream_load_timeout_second = 1; // 1s
|
||||
public static int min_load_timeout_second = 1; // 1s
|
||||
|
||||
/*
|
||||
* Default hadoop load timeout
|
||||
|
||||
@ -630,7 +630,7 @@ public class Load {
|
||||
} else if (etlJobType == EtlJobType.BROKER) {
|
||||
if (job.getTimeoutSecond() == 0) {
|
||||
// set default timeout
|
||||
job.setTimeoutSecond(Config.pull_load_task_default_timeout_second);
|
||||
job.setTimeoutSecond(Config.broker_load_default_timeout_second);
|
||||
}
|
||||
} else if (etlJobType == EtlJobType.INSERT) {
|
||||
job.setPrority(TPriority.HIGH);
|
||||
|
||||
@ -84,7 +84,7 @@ public class BrokerLoadJob extends LoadJob {
|
||||
public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, List<DataDescription> dataDescriptions)
|
||||
throws MetaNotFoundException {
|
||||
super(dbId, label);
|
||||
this.timeoutSecond = Config.pull_load_task_default_timeout_second;
|
||||
this.timeoutSecond = Config.broker_load_default_timeout_second;
|
||||
this.dataDescriptions = dataDescriptions;
|
||||
this.brokerDesc = brokerDesc;
|
||||
this.jobType = EtlJobType.BROKER;
|
||||
|
||||
@ -86,9 +86,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
|
||||
// optional properties
|
||||
// timeout second need to be reset in constructor of subclass
|
||||
protected long timeoutSecond = Config.pull_load_task_default_timeout_second;
|
||||
protected long timeoutSecond = Config.broker_load_default_timeout_second;
|
||||
protected long execMemLimit = 2147483648L; // 2GB;
|
||||
protected double maxFilterRatio = 0;
|
||||
@Deprecated
|
||||
protected boolean deleteFlag = false;
|
||||
protected boolean strictMode = true;
|
||||
|
||||
|
||||
@ -109,7 +109,7 @@ public class PullLoadTask {
|
||||
|
||||
private long getLeftTimeMs() {
|
||||
if (jobDeadlineMs <= 0) {
|
||||
return Config.pull_load_task_default_timeout_second * 1000;
|
||||
return Config.broker_load_default_timeout_second * 1000;
|
||||
}
|
||||
return jobDeadlineMs - System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@ -130,10 +130,10 @@ public class GlobalTransactionMgr {
|
||||
throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented");
|
||||
}
|
||||
|
||||
if (timeoutSecond > Config.max_stream_load_timeout_second ||
|
||||
timeoutSecond < Config.min_stream_load_timeout_second) {
|
||||
if (timeoutSecond > Config.max_load_timeout_second ||
|
||||
timeoutSecond < Config.min_load_timeout_second) {
|
||||
throw new AnalysisException("Invalid timeout. Timeout should between "
|
||||
+ Config.min_stream_load_timeout_second + " and " + Config.max_stream_load_timeout_second
|
||||
+ Config.min_load_timeout_second + " and " + Config.max_load_timeout_second
|
||||
+ " seconds");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user