diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md index 9cfd519cfd..4c21c21798 100644 --- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md @@ -1,26 +1,30 @@ # Broker Load -Broker load 是一个异步的导入方式,主要支持的数据源有 HDFS, BOS, AFS。用户需要通过 Mysql client 创建 Broker load 导入,并通过查看导入命令检查导入结果。 -Broker load 主要适用于源数据在文件系统中的,或者是超大文件导入的需求。 +Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。 -# 名词解释 -1. HDFS: Hadoop 分布式文件系统。用户的待导入数据可能存放的位置。 -2. BOS:百度云提供的对象存储系统,百度云上的 Palo 用户的待导入数据可能存放的位置。 -3. AFS:百度内部提供的超大规模文件系统,百度厂内的 Palo 用户的待导入数据可能存放的位置。 -4. plan:导入执行计划,BE 会执行导入执行计划将数据导入到 Doris 系统中。 -5. Broker:Doris 系统中负责读取外部文件系统数据的组件。 -6. backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。 -7. frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入 plan 生成和导入任务的调度工作。 +用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。 -# 基本原理 +## 适用场景 -用户在提交导入任务后,FE 会生成对应的 plan 并根据目前 BE 的个数和文件的大小,将 plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。 +* 源数据在 Broker 可以访问的存储系统中,如 HDFS。 +* 数据量在 几十到百GB 级别。 -BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,并且事务被 publish 后,导入完成。 +## 名词解释 + +1. Frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入 plan 生成和导入任务的调度工作。 +2. Backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。 +3. Broker:Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统中文件的能力。 +4. Plan:导入执行计划,BE 会执行导入执行计划将数据导入到 Doris 系统中。 + +## 基本原理 + +用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。 + +BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。 ``` + - | user create broker load + | 1. user create broker load v +----+----+ | | @@ -28,359 +32,420 @@ BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之 | | +----+----+ | - | BE etl and load the data + | 2. BE etl and load the data +--------------------------+ | | | +---v---+ +--v----+ +---v---+ | | | | | | | BE | | BE | | BE | | | | | | | -+---+---+ +---+---+ +----+--+ - | | | - | | | pull data from broker -+---v---+ +---v---+ +----v--+ ++---+-^-+ +---+-^-+ +--+-^--+ + | | | | | | + | | | | | | 3. pull data from broker ++---v-+-+ +---v-+-+ +--v-+--+ | | | | | | |Broker | |Broker | |Broker | | | | | | | -+-------+--+ +---+---+ ++------+ - | | | - | | | - +-v------v---------v--+ - |HDFS/BOS/AFS cluster | - | | - +---------------------+ ++---+-^-+ +---+-^-+ +---+-^-+ + | | | | | | ++---v-+-----------v-+----------v-+-+ +| HDFS/BOS/AFS cluster | +| | ++----------------------------------+ ``` -# 基本操作 -## 创建导入 +## 基本操作 + +### 创建导入 Broker load 创建导入语句 +语法: + ``` -语法 -LOAD LABEL load_label - (data_desc, ...) - with broker broker_name broker_properties - [PROPERTIES (key1=value1, ... )] +LOAD LABEL db_name.label_name +(data_desc, ...) +WITH BROKER broker_name broker_properties +[PROPERTIES (key1=value1, ... )] -LABEL: db_name.label_name +* data_desc: -data_desc: - DATA INFILE ('file_path', ...) - [NEGATIVE] - INTO TABLE tbl_name - [PARTITION (p1, p2)] - [COLUMNS TERMINATED BY separator ] - [(col1, ...)] - [SET (k1=f1(xx), k2=f2(xx))] + DATA INFILE ('file_path', ...) + [NEGATIVE] + INTO TABLE tbl_name + [PARTITION (p1, p2)] + [COLUMNS TERMINATED BY separator ] + [(col1, ...)] + [SET (k1=f1(xx), k2=f2(xx))] -broker_name: string - -broker_properties: (key1=value1, ...) +* broker_properties: + (key1=value1, ...) +``` 示例: -load label test.int_table_01 - (DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/id_name") into table id_name_table columns terminated by "," (tmp_c1,tmp_c2) set (id=tmp_c2, name=tmp_c1)) - with broker 'broker' ("username"="root", "password"="3trqDWfl") - properties ("timeout" = "10000" ); + +``` +LOAD LABEL db1.label1 +( + DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") + INTO TABLE tbl1 + COLUMNS TERMINATED BY "," + (tmp_c1,tmp_c2) + SET + ( + id=tmp_c2, + name=tmp_c1) + ), + DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2") + INTO TABLE tbl2 + COLUMNS TERMINATED BY "," + (col1, col2) +) +WITH BROKER 'broker' +( + "username"="user", + "password"="pass" +) +PROPERTIES +( + "timeout" = "3600" +); ``` -创建导入的详细语法执行 ``` HELP BROKER LOAD;``` 查看语法帮助。这里主要介绍 Broker load 的创建导入语法中参数意义和注意事项。 +创建导入的详细语法执行 ```HELP BROKER LOAD``` 查看语法帮助。这里主要介绍 Broker load 的创建导入语法中参数意义和注意事项。 -+ label +#### Label - 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。 - - label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At most once** - - 当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。 +导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 Label。Label 是用户在导入命令中自定义的名称。通过这个 Label,用户可以查看对应导入任务的执行情况。 + +Label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once 语义** + +当 Label 对应的导入作业状态为 CANCELLED 时,可以再次使用该 Label 提交导入作业。 -### 数据描述类参数 +#### 数据描述类参数 数据描述类参数主要指的是 Broker load 创建导入语句中的属于 ```data_desc``` 部分的参数。每组 ```data_desc ``` 主要表述了本次导入涉及到的数据源地址,ETL 函数,目标表及分区等信息。 下面主要对数据描述类的部分参数详细解释: -+ 多表导入 ++ 多表导入 - Broker load 支持一次导入任务涉及多张表,每个 Broker load 导入任务可在多个 ``` data_desc ``` 声明多张表来实现多表导入。每个单独的 ```data_desc``` 还可以指定属于该表的数据源地址。Broker load 保证了单次导入的多张表之间原子性成功或失败。 + Broker load 支持一次导入任务涉及多张表,每个 Broker load 导入任务可在多个 ``` data_desc ``` 声明多张表来实现多表导入。每个单独的 ```data_desc``` 还可以指定属于该表的数据源地址。Broker load 保证了单次导入的多张表之间原子性成功或失败。 -+ negative ++ negative - ```data_desc```中还可以设置数据取反导入。这个功能主要用于,当用户上次导入已经成功,但是想取消上次导入的数据时,就可以使用相同的源文件并设置导入 nagetive 取反功能。上次导入的数据就可以被撤销了。 + ```data_desc```中还可以设置数据取反导入。这个功能主要用于,当数据表中聚合列的类型都为 SUM 类型时。如果希望撤销某一批导入的数据。则可以通过 `negative` 参数当如同一批数据。Doris 会自动为这一批数据在聚合列上数据取反,以达到消除同一批数据的功能。 + ++ partition - 该功能仅对聚合类型为 SUM 的列有效。聚合列数值在取反导入后会根据待导入数据值,不断递减。 - -+ Partition + 在 ```data_desc``` 中可以指定待导入表的 partition 信息,如果待导入数据不属于指定的 partition 则不会被导入。同时,不在指定 Partition 的数据会被认为是错误数据。 + +#### 导入作业参数 - 在 ``` data_desc ``` 中可以指定待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 ```dpp.abnorm.ALL ``` - -### 导入任务参数 +导入作业参数主要指的是 Broker load 创建导入语句中的属于 ```opt_properties```部分的参数。导入作业参数是作用于整个导入作业的。 -导入任务参数主要指的是 Broker load 创建导入语句中的属于 ```opt_properties```部分你的参数。导入任务参数是作用于整个导入任务的。 - -下面主要对导入任务参数的部分参数详细解释: +下面主要对导入作业参数的部分参数详细解释: + timeout - - 导入任务的超时时间(以秒为单位),用户可以在```opt_properties```中自行设置每个导入的超时时间。导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。Broker load 的默认导入超时时间为4小时。 - - 通常情况下,用户不需要手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。 - - **推荐超时时间** - - ``` - 总文件大小(MB) / 用户 Doris 集群最慢导入速度(MB/s) > timeout > ((总文件大小(MB) * 待导入的表及相关 Roll up 表的个数) / (10 * 导入并发数) ) - - 导入并发数见文档最后的导入系统配置说明,公式中的 10 为目前的导入限速 10MB/s。 - - ``` - - 例如一个 1G 的待导入数据,待导入表涉及了3个 Roll up 表,当前的导入并发数为 3。则 timeout 的 最小值为 ```(1 * 1024 * 3 ) / (10 * 3) = 102 秒``` - - 由于每个 Doris 集群的机器环境不同且集群并发的查询任务也不同,所以用户 Doris 集群的最慢导入速度需要用户自己根据历史的导入任务速度进行推测。 - - *注意:用户设置导入超时时间最好在上述范围内。如果设置的超时时间过长,可能会导致总导入任务的个数超过 Doris 系统限制,导致后续提交的导入被系统取消* - + + 导入作业的超时时间(以秒为单位),用户可以在 ```opt_properties``` 中自行设置每个导入的超时时间。导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。Broker load 的默认导入超时时间为4小时。 + + 通常情况下,用户不需要手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。 + + > 推荐超时时间 + > + > 总文件大小(MB) / 用户 Doris 集群最慢导入速度(MB/s) > timeout > ((总文件大小(MB) * 待导入的表及相关 Roll up 表的个数) / (10 * 导入并发数) ) + + > 导入并发数见文档最后的导入系统配置说明,公式中的 10 为目前的导入限速 10MB/s。 + + > 例如一个 1G 的待导入数据,待导入表包含3个 Rollup 表,当前的导入并发数为 3。则 timeout 的 最小值为 ```(1 * 1024 * 3 ) / (10 * 3) = 102 秒``` + + 由于每个 Doris 集群的机器环境不同且集群并发的查询任务也不同,所以用户 Doris 集群的最慢导入速度需要用户自己根据历史的导入任务速度进行推测。 + + max\_filter\_ratio - 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的 filter ratio 超过该值,则导入失败。计算公式为: ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )> max_filter_ratio ``` - - ``` dpp.abnorm.ALL ``` 在代码中也叫 ``` num_rows_filtered``` 指的是导入过程中被过滤的错误数据。比如:列在表中为非空列,但是导入数据为空则为错误数据。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的错误数据量。 - - ``` dpp.norm.ALL ``` 指的是导入过程中正确数据的条数。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的正确数据量。 - - ``` 原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL ``` - + 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。 + + 如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 + + 计算公式为: + + ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio ``` + + ```dpp.abnorm.ALL``` 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。 + + ```dpp.norm.ALL``` 指的是导入过程中正确数据的条数。可以通过 ```SHOW LOAD``` 命令查询导入任务的正确数据量。 + + 原始文件的行数 = `dpp.abnorm.ALL + dpp.norm.ALL` + + exec\_mem\_limit - 导入任务的内存使用上限。当导入任务使用的内存超过设定上限时,导入任务会被 CANCEL。系统默认的内存上限为2G。 - - 设定导入任务内存上限,可以保证各个导入任务之间以及导入任务和查询之间相互不影响。导入任务的内存上限不宜过大,过大可能会导致导入占满 be 内存,查询无法快速返回结果。 - - ```导入任务内存使用量 < 文件大小 / 当前执行导入任务的并发数``` + 导入任务的内存使用上限。当导入任务使用的内存超过设定上限时,导入任务会被 CANCEL。默认为 2G,单位为字节。 + + 当导入出现 `Memory exceed limit` 错误时,可以适当调整这个参数,如调整为 4G、8G 等。 -+ strict mode ++ strict\_mode - Broker load 导入可以开启 strict mode模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode为开启。 + Broker load 导入可以开启 strict mode 模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode 为开启。 - strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: + strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下: - 1. 对于列类型转换来说,如果 strict\_mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。 + 1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。 - 2. 对于导入的某列包含函数变换的,导入的值和函数的结果一致,strict 对其不产生影响。(其中 strftime 等 Broker load 支持的函数也属于这类)。 + 2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。 + + 3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 - 3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict 对其也不产生影响。 - + 例如:如果类型是 decimal(1,0), 原始数据为10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 +#### strict mode 与 source data 的导入关系 - #### strict mode 与 source data 的导入关系 +这里以列类型为 TinyInt 来举例 - 这里以列类型为 TinyInt 来举例 +>注:当表中的列允许导入空值时 - 注:当表中的列允许导入空值时 +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|---------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa or 2000 | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 | 1 | true or false | correct data| - source data | source data example | string to int | strict_mode | load_data -------------|---------------------|-----------------|--------------------|--------- -空值 | \N | N/A | true or false | NULL -not null | aaa or 2000 | NULL | true | filtered -not null | aaa | NULL | false | NULL -not null | 1 | 1 | true or false | correct data - - 这里以列类型为 Decimal(1,0) 举例 +这里以列类型为 Decimal(1,0) 举例 - 注:当表中的列允许导入空值时 +>注:当表中的列允许导入空值时 - source data | source data example | string to int | strict_mode | load_data -------------|---------------------|-----------------|--------------------|--------- -空值 | \N | N/A | true or false | NULL -not null | aaa | NULL | true | filtered -not null | aaa | NULL | false | NULL -not null | 1 or 10 | 1 | true or false | correct data +|source data | source data example | string to int | strict_mode | result| +|------------|---------------------|-----------------|--------------------|--------| +|空值 | \N | N/A | true or false | NULL| +|not null | aaa | NULL | true | invalid data(filtered)| +|not null | aaa | NULL | false | NULL| +|not null | 1 or 10 | 1 | true or false | correct data| - *注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode过滤。* +> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。 -## 查看导入 +### 查看导入 -Broker load 导入方式由于是异步的,所以用户必须将创建导入的 Label 记录,并且在**查看导入命令中使用 Label 来查看导入结果**。查看导入命令在所有导入方式中是通用的,具体语法可执行 ``` HELP SHOW LOAD ```查看。 +Broker load 导入方式由于是异步的,所以用户必须将创建导入的 Label 记录,并且在**查看导入命令中使用 Label 来查看导入结果**。查看导入命令在所有导入方式中是通用的,具体语法可执行 ```HELP SHOW LOAD``` 查看。 + +示例: + +``` +mysql> show load order by createtime desc limit 1\G +*************************** 1. row *************************** + JobId: 76391 + Label: label1 + State: FINISHED + Progress: ETL:N/A; LOAD:100% + Type: BROKER + EtlInfo: dpp.abnorm.ALL=15; dpp.norm.ALL=28133376 + TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5 + ErrorMsg: N/A + CreateTime: 2019-07-27 11:46:42 + EtlStartTime: 2019-07-27 11:46:44 + EtlFinishTime: 2019-07-27 11:46:44 + LoadStartTime: 2019-07-27 11:46:44 +LoadFinishTime: 2019-07-27 11:50:16 + URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317 +``` 下面主要介绍了查看导入命令返回结果集中参数意义: + JobId - 导入任务的唯一标识,每个导入任务的JobId都不同。与 Label 不同的是,JobId永远不会相同,而 Label 则可以在导入任务失败后被复用。 - + 导入任务的唯一ID,每个导入任务的 JobId 都不同,有系统自动生成。与 Label 不同的是,JobId永远不会相同,而 Label 则可以在导入任务失败后被复用。 + + Label - 导入任务的标识。 - + 导入任务的标识。 + + State - 导入任务当前所处的阶段。在 Broker load 导入过程中主要会出现 PENDING 和 LOADING 这两个导入中的状态。如果 Broker load 处于 PENDING 状态,则说明当前导入任务正在等待被执行, LOADING 状态则表示正在执行中( transform and loading data)。 - - 导入任务的最终阶段有两个: CANCELLED 和 FINISHED,当 Load job 处于这两个阶段时,导入完成。其中 CANCELLED 为导入失败,FINISHED 为导入成功。 - + 导入任务当前所处的阶段。在 Broker load 导入过程中主要会出现 PENDING 和 LOADING 这两个导入中的状态。如果 Broker load 处于 PENDING 状态,则说明当前导入任务正在等待被执行;LOADING 状态则表示正在执行中。 + + 导入任务的最终阶段有两个:CANCELLED 和 FINISHED,当 Load job 处于这两个阶段时,导入完成。其中 CANCELLED 为导入失败,FINISHED 为导入成功。 + + Progress - 导入任务的进度描述。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。目前 Broker load 由于只有 LOADING 阶段,所以 ETL 则会永远显示为 ```N/A ``` - - LOAD 的进度范围为:0~100%。``` LOAD 进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%``` - - **如果所有导入表均完成导入,此时 LOAD 的进度为 99%** 导入进入到最后生效阶段,整个导入完成后,LOAD 的进度才会改为 100%。 - + 导入任务的进度描述。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。目前 Broker load 由于只有 LOADING 阶段,所以 ETL 则会永远显示为 `N/A` + + LOAD 的进度范围为:0~100%。 + + ```LOAD 进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%``` + + **如果所有导入表均完成导入,此时 LOAD 的进度为 99%** 导入进入到最后生效阶段,整个导入完成后,LOAD 的进度才会改为 100%。 + + 导入进度并不是线性的。所以如果一段时间内进度没有变化,并不代表导入没有在执行。 + + Type - 导入任务的类型。Broker load 的 type 取值只有 BROKER。 + 导入任务的类型。Broker load 的 type 取值只有 BROKER。 + EtlInfo - 主要显示了导入的数据量指标 ``` dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的 filter ratio 是否超过 max\_filter\_ratio. - + 主要显示了导入的数据量指标 ```dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的错误率是否超过 max\_filter\_ratio。 + + TaskInfo - 主要显示了当前导入任务参数,也就是创建 Broker load 导入任务时用户指定的导入任务参数,包括:cluster,timeout和max\_filter\_ratio。 - + 主要显示了当前导入任务参数,也就是创建 Broker load 导入任务时用户指定的导入任务参数,包括:`cluster`,`timeout` 和`max_filter_ratio`。 + + ErrorMsg - 在导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type和msg,如果导入任务成功则显示 ```N/A```。 - - ``` - type的取值意义 - USER_CANCEL: 用户取消的任务 - ETL_RUN_FAIL:在ETL阶段失败的导入任务 - ETL_QUALITY_UNSATISFIED:数据质量不合格,也就是错误数据率超过了 max_filter_ratio - LOAD_RUN_FAIL:在LOADING阶段失败的导入任务 - TIMEOUT:导入任务没在超时时间内完成 - UNKNOWN:未知的导入错误 - - ``` - + 在导入任务状态为CANCELLED,会显示失败的原因,显示分两部分:type 和 msg,如果导入任务成功则显示 ```N/A```。 + + type的取值意义: + + ``` + USER_CANCEL: 用户取消的任务 + ETL_RUN_FAIL:在ETL阶段失败的导入任务 + ETL_QUALITY_UNSATISFIED:数据质量不合格,也就是错误数据率超过了 max_filter_ratio + LOAD_RUN_FAIL:在LOADING阶段失败的导入任务 + TIMEOUT:导入任务没在超时时间内完成 + UNKNOWN:未知的导入错误 + ``` + CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime - 这几个值分别代表导入创建的时间,ETL阶段开始的时间,ETL阶段完成的时间,Loading阶段开始的时间和整个导入任务完成的时间。 - - Broker load 导入由于没有 ETL 阶段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被设置为同一个值。 - - 导入任务长时间停留在CreateTime,而 LoadStartTime 为 N/A 则说明目前导入任务堆积严重。用户可减少导入提交的频率。 - - ``` - LoadFinishTime - CreateTime = 整个导入任务所消耗时间 - LoadFinishTime - LoadStartTime = 整个 Broker load 导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间 - ``` - + 这几个值分别代表导入创建的时间,ETL阶段开始的时间,ETL阶段完成的时间,Loading阶段开始的时间和整个导入任务完成的时间。 + + Broker load 导入由于没有 ETL 阶段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被设置为同一个值。 + + 导入任务长时间停留在 CreateTime,而 LoadStartTime 为 N/A 则说明目前导入任务堆积严重。用户可减少导入提交的频率。 + + ``` + LoadFinishTime - CreateTime = 整个导入任务所消耗时间 + LoadFinishTime - LoadStartTime = 整个 Broker load 导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间 + ``` + + URL - 导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。 + 导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。 -## 取消导入 +### 取消导入 -Broker load 可以被用户手动取消,取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 ``` HELP CANCEL LOAD ```查看。 +当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 ```HELP CANCEL LOAD```查看。 -# Broker load 导入系统配置 -## FE conf 中的配置 +## 相关系统配置 + +### FE 配置 下面几个配置属于 Broker load 的系统级别配置,也就是作用于所有 Broker load 导入任务的配置。主要通过修改 ``` fe.conf```来调整配置值。 -+ min\_bytes\_per\_broker\_scanner,max\_bytes\_per\_broker\_scanner 和 max\_broker\_concurrency - - 前两个配置限制了单个 BE 处理的数据量的最小和最大值。第三个配置限制了最大的导入并发数。最小处理的数据量,最大并发数,源文件的大小和当前集群 BE 的个数**共同决定了本次导入的并发数**。 - - ```本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)``` - ```本次导入单个BE的处理量 = 源文件大小/本次导入的并发数``` - - 如果,``` 本次导入单个BE的处理量 > 最大处理的数据量```则导入任务会被取消。一般来说,导入的文件过大可能导致这个问题,通过上述公式,预先计算出单个 BE 的预计处理量并调整最大处理量即可解决问题。 - - ``` - 默认配置: - 参数名:min_bytes_per_broker_scanner, default 64MB,单位bytes - 参数名:max_broker_concurrency, default 10 - 参数名:max_bytes_per_broker_scanner,default 3G,单位bytes - - ``` - -# Broker load 最佳实践 ++ min\_bytes\_per\_broker\_scanner/max\_bytes\_per\_broker\_scanner/max\_broker\_concurrency + + 前两个配置限制了单个 BE 处理的数据量的最小和最大值。第三个配置限制了一个作业的最大的导入并发数。最小处理的数据量,最大并发数,源文件的大小和当前集群 BE 的个数 **共同决定了本次导入的并发数**。 + + ``` + 本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数) + 本次导入单个BE的处理量 = 源文件大小/本次导入的并发数 + ``` + + 通常一个导入作业支持的最大数据量为 `max_bytes_per_broker_scanner * BE 节点数`。如果需要导入更大数据量,则需要适当调整 `max_bytes_per_broker_scanner` 参数的大小。 + + 默认配置: + + ``` + 参数名:min_bytes_per_broker_scanner, 默认 64MB,单位bytes。 + 参数名:max_broker_concurrency, 默认 10。 + 参数名:max_bytes_per_broker_scanner,默认 3G,单位bytes。 + ``` + +## 最佳实践 + +### 应用场景 -## 应用场景 使用 Broker load 最适合的场景就是原始数据在文件系统(HDFS,BOS,AFS)中的场景。其次,由于 Broker load 是单次导入中唯一的一种异步导入的方式,所以如果用户在导入大文件中,需要使用异步接入,也可以考虑使用 Broker load。 -## 数据量 +### 数据量 这里仅讨论单个 BE 的情况,如果用户集群有多个 BE 则下面标题中的数据量应该乘以 BE 个数来计算。比如:如果用户有3个 BE,则 3G 以下(包含)则应该乘以 3,也就是 9G 以下(包含)。 + 3G 以下(包含) - 用户可以直接提交 Broker load 创建导入请求。 - + 用户可以直接提交 Broker load 创建导入请求。 + + 3G 以上 - 由于单个导入 BE 最大的处理量为 3G,超过 3G 的待导入文件就需要通过调整 Broker load 的导入参数来实现大文件的导入。 - - 1. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。 + 由于单个导入 BE 最大的处理量为 3G,超过 3G 的待导入文件就需要通过调整 Broker load 的导入参数来实现大文件的导入。 + + 1. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。 - ``` - 修改 fe.conf 中配置 - - max_broker_concurrency = BE 个数 - 当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency - max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量 - - 比如一个 100G 的文件,集群的 BE 个数为 10个 - max_broker_concurrency = 10 - max_bytes_per_broker_scanner >= 10G = 100G / 10 - - ``` - - 修改后,所有的 BE 会并发的处理导入任务,每个 BE 处理原始文件的一部分。 - - *注意:上述两个 FE 中的配置均为系统配置,也就是说其修改是作用于所有的 Broker load的任务的。* - - 2. 在创建导入的时候自定义当前导入任务的 timeout 时间 + ``` + 修改 fe.conf 中配置 + + max_broker_concurrency = BE 个数 + 当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency + max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量 + + 比如一个 100G 的文件,集群的 BE 个数为 10 个 + max_broker_concurrency = 10 + max_bytes_per_broker_scanner >= 10G = 100G / 10 + + ``` + + 修改后,所有的 BE 会并发的处理导入任务,每个 BE 处理原始文件的一部分。 + + *注意:上述两个 FE 中的配置均为系统配置,也就是说其修改是作用于所有的 Broker load的任务的。* + + 2. 在创建导入的时候自定义当前导入任务的 timeout 时间 - ``` - 当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s - - 比如一个 100G 的文件,集群的 BE 个数为 10个 - timeout >= 1000s = 10G / 10M/s - - ``` - - 3. 当用户发现第二步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时 + ``` + 当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s + + 比如一个 100G 的文件,集群的 BE 个数为 10个 + timeout >= 1000s = 10G / 10M/s + + ``` + + 3. 当用户发现第二步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时 - 这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。 - - 可以通过如下公式计算出 Doris 集群期望最大导入文件数据量: - - ``` - 期望最大导入文件数据量 = 14400s * 10M/s * BE 个数 - 比如:集群的 BE 个数为 10个 - 期望最大导入文件数据量 = 14400 * 10M/s * 10 = 1440000M ≈ 1440G - - 注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。 - - ``` - -## 完整例子 + 这时候不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。 + + 可以通过如下公式计算出 Doris 集群期望最大导入文件数据量: + + ``` + 期望最大导入文件数据量 = 14400s * 10M/s * BE 个数 + 比如:集群的 BE 个数为 10个 + 期望最大导入文件数据量 = 14400 * 10M/s * 10 = 1440000M ≈ 1440G + + 注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。 + + ``` + +### 完整例子 数据情况:用户数据在 HDFS 中,文件地址为 hdfs://abc.com:8888/store_sales, hdfs 的认证用户名为 root, 密码为 password, 数据量大小约为 30G,希望导入到数据库 bj_sales 的表 store_sales 中。 -集群情况:集群的 BE 个数约为 3 个,集群中有 3 个 Broker 名称均为 broker。 +集群情况:集群的 BE 个数约为 3 个,Broker 名称均为 broker。 + step1: 经过上述方法的计算,本次导入的单个 BE 导入量为 10G,则需要先修改 FE 的配置,将单个 BE 导入最大量修改为: - ``` - max_bytes_per_broker_scanner = 100000000000 + ``` + max_bytes_per_broker_scanner = 10737418240 - ``` + ``` + step2: 经计算,本次导入的时间大约为 1000s,并未超过默认超时时间,可不配置导入自定义超时时间。 + step3:创建导入语句 - ``` - load label store_sales_broker_load_01.bj_sales - (DATA INFILE("hdfs://abc.com:8888/store_sales") into table store_sales - with broker 'broker' ("username"="root", "password"="password") - ``` - + ``` + LOAD LABEL bj_sales.store_sales_broker_load_01 + ( + DATA INFILE("hdfs://abc.com:8888/store_sales") + INTO TABLE store_sales + ) + WITH BROKER 'broker' + ("username"="root", "password"="password"); + ``` + +## 常见问题 + +* 导入报错:`Scan bytes per broker scanner exceed limit:xxx` + + 请参照文档中最佳实践部分,修改 FE 配置项 `max_bytes_per_broker_scanner` 和 `max_broker_concurrency` + +* 导入报错:`failed to send batch` 或 `TabletWriter add batch with unknown id` + + 请参照 [导入手册](./load-manual.md) 中 **通用系统配置** 中 **BE 配置**,适当修改 `tablet_writer_rpc_timeout_sec` 和 `streaming_load_rpc_max_alive_time_sec`。 + diff --git a/docs/documentation/cn/administrator-guide/load-data/index.rst b/docs/documentation/cn/administrator-guide/load-data/index.rst index fe908259db..f4807d1df8 100644 --- a/docs/documentation/cn/administrator-guide/load-data/index.rst +++ b/docs/documentation/cn/administrator-guide/load-data/index.rst @@ -4,6 +4,9 @@ .. toctree:: :maxdepth: 2 - :glob: - * + load-manual.md + broker-load-manual.md + stream-load-manual.md + routine-load-manual.md + insert-into-manual.md diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index 80b82daf4d..c80b4f87cf 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -1,99 +1,103 @@ -# Insert into +# Insert Into -Insert into 主要用来将某几行数据插入某张表,或者将已经存在在 Doris 系统中的部分数据导入到目标表中。 +Insert Into 语句的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似。但在 Doris 中,所有的数据写入都是一个独立的导入作业。所以这里将 Insert Into 也作为一种导入方式介绍。 -目前所有的 Insert into 导入实现均为同步执行。但是为了兼容旧的 Insert into 导入,不指定 STREAMING 的 Insert into 同步执行完后,依然可以通过查看导入命令查看导入状态和结果。 +主要的 Insert Into 命令包含以下两种; -# 基本操作 -## 创建导入 +* INSERT INTO tbl SELECT ... +* INSERT INTO tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...); -Insert into 创建导入请求需要通过 Mysql 协议提交,创建导入请求会同步返回导入结果。 +其中第二种命令仅用于 Demo,不要使用在测试或生产环境中。 -Insert info 创建导入语句 +## 基本操作 + +### 创建导入 + +Insert Into 命令需要通过 MySQL 协议提交,创建导入请求会同步返回导入结果。 + +语法: ``` -语法: -INSERT INTO table_name [partition_info] [col_list] [plan_hints] [query_stmt] [VALUES] +INSERT INTO table_name [partition_info] [col_list] [query_stmt] [VALUES]; +``` 示例: -insert into char_4_new values ("qweasdzxcqweasdzxc"), ("a"); -insert into char_4_new select * from char_3_new; + +``` +INSERT INTO tbl2 SELECT * FROM tbl3; +INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); ``` 下面主要介绍创建导入语句中使用到的参数: -+ query\_stmt - - 通过一个查询语句,将查询语句的结果导入到 Doris 系统中的其他表。 - -+ VALUES - - 用户可以通过 VALUES 语法插入一条或者多条数据。 - - *注意:VALUES 方式仅适用于导入几条数据作为导入 DEMO 的情况,完全不适用于任何测试和生产环境。Doris 系统本身也不适合单条数据导入的场景。建议使用 Insert into select 的方式进行批量导入。* - + partition\_info - 导入表的目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。所有不属于当前目标分区的数据均属于 ``` num_rows_unselected ```,不参与 ``` max_filter_ratio ``` 的计算。 - + 导入表的目标分区,如果指定目标分区,则只会导入符合目标分区的数据。如果没有指定,则默认值为这张表的所有分区。 + + col\_list - 导入表的目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。如果待表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 INSERT 就会执行失败。 + 导入表的目标列,可以以任意的顺序存在。如果没有指定目标列,那么默认值是这张表的所有列。如果待表中的某个列没有存在目标列中,那么这个列需要有默认值,否则 Insert Into 就会执行失败。 - 如果查询语句的结果列类型与目标列的类型不一致,那么会调用隐式类型转化,如果不能够进行转化,那么 INSERT 语句会报语法解析错误。 - -+ plan\_hints + 如果查询语句的结果列类型与目标列的类型不一致,那么会调用隐式类型转化,如果不能够进行转化,那么 Insert Into 语句会报语法解析错误。 - 可指定当前导入的导入方式,目前无论是否指定 hint 导入方式均为同步,用户均可以通过创建导入请求的结果判断导入结果。 - - 唯一不同的是,不指定 STREAMING 的导入方式,由于为了兼容旧的使用习惯,用户依旧可以通过查看导入命令看到导入的结果。但导入实际上已经在创建导入返回结果的时候就已经完成了。 - -## 导入结果 ++ query\_stmt -Insert into 的导入结果也就是创建导入命令的返回值。 + 通过一个查询语句,将查询语句的结果导入到 Doris 系统中的其他表。查询语句支持任意 Doris 支持的 SQL 查询语法。 -指定 plan\_hints 为 STREAMING 的 Insert into 导入直接根据 sql 执行的返回码确定导入是否成功,除了 query 返回 OK 以外,其他均为导入失败。 ++ VALUES + + 用户可以通过 VALUES 语法插入一条或者多条数据。 + + *注意:VALUES 方式仅适用于导入几条数据作为导入 DEMO 的情况,完全不适用于任何测试和生产环境。Doris 系统本身也不适合单条数据导入的场景。建议使用 INSERT INTO SELECT 的方式进行批量导入。* + +### 导入结果 -未指定 plan\_hints 的 Insert into 导入如果导入失败,也是返回 sql 执行失败,如果导入成功还会附加返回一个 Label 字段。 +Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。 -+ Label +如果导入失败,则返回语句执行失败。如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。 - 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。Insert into 的 label 则是由系统生成的,用户可以拿着这个 label 通过查询导入命令异步获取导入状态。 - - *注意:只有不指定 plan\_hints 的 Insert into 才会返回 Label 参数* - -# Insert into 系统配置 -## FE conf 中的配置 +Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 + +## 相关系统配置 + +### FE 配置 + timeout - 导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。 - - 目前 Insert into 并不支持自定义导入的 timeout 时间,所有 Insert into 导入的超时时间是统一的,默认的 timeout 时间为1小时。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```insert_load_default_timeout_second```。 - -## Session variable 中的配置 + 导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。 + + 目前 Insert Into 并不支持自定义导入的 timeout 时间,所有 Insert Into 导入的超时时间是统一的,默认的 timeout 时间为1小时。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```insert_load_default_timeout_second```。 + + 同时 Insert Into 语句收到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。 + +### Session 变量 + enable\_insert\_strict - Insert into 导入本身不能设置 ```max_filter_ratio ```,这个参数用来代替错误率限制的功能,可以通过 ``` session variable ``` 设置,默认值为 false。当该参数设置为 false 时,则和```max_filter_ratio = 1 ``` 意义相同。如果该参数设置为 true 时,则和```max_filter_ratio = 0 ``` 意义相同。 - -# Insert into 最佳实践 -## 应用场景 -1. 用户希望仅导入几条假数据,验证一下 Doris 系统的功能。此时适合使用 Insert into VALUS 的语法。 -2. 用户希望将已经在 Doris 系统中的数据进行 ETL 转换并导入到一个新的 Doris 表中,此时适合使用 Insert into query\_stmt。 + Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。 + ++ query\_timeout -除以上两种情况外,其他情况并不适合使用 Insert into + Insert Into 本身也是一个 SQL 命令,因此 Insert Into 语句也受到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。 + +## 最佳实践 -## 数据量 -Insert into 对数据量没有限制,大数据量导入也可以支持。但 Insert into 有默认的超时时间,用户预估的导入数据量过大,就需要修改系统的 Insert into 导入超时时间。 +### 应用场景 +1. 用户希望仅导入几条假数据,验证一下 Doris 系统的功能。此时适合使用 INSERT INTO VALUS 的语法。 +2. 用户希望将已经在 Doris 表中的数据进行 ETL 转换并导入到一个新的 Doris 表中,此时适合使用 INSERT INTO SELECT 语法。 +3. 用户可以创建一种外部表,如 MySQL 外部表映射一张 MySQL 系统中的表。或者创建 Broker 外部表来映射 HDFS 上的数据文件。然后通过 INSERT INTO SELECT 语法将外部表中的数据导入到 Doris 表中存储。 + +### 数据量 +Insert Into 对数据量没有限制,大数据量导入也可以支持。但 Insert Into 有默认的超时时间,用户预估的导入数据量过大,就需要修改系统的 Insert Into 导入超时时间。 ``` 导入数据量 = 36G 约≤ 3600s * 10M/s 其中 10M/s 是最大导入限速,用户需要根据当前集群情况计算出平均的导入速度来替换公式中的 10M/s ``` -## 完成例子 -用户有一张表 store_sales 在数据库 sales 中,用户又创建了一张表叫 bj_store_sales 也在数据库 sales 中,用户希望将 store_sales 中销售记录在 bj 的数据导入到这张新建的表 bj_store_sales 中。导入的数据量约为:10G。 +### 完整例子 + +用户有一张表 store\_sales 在数据库 sales 中,用户又创建了一张表叫 bj\_store\_sales 也在数据库 sales 中,用户希望将 store\_sales 中销售记录在 bj 的数据导入到这张新建的表 bj\_store\_sales 中。导入的数据量约为:10G。 ``` store_sales schema: @@ -106,23 +110,28 @@ bj_store_sales schema: 集群情况:用户当前集群的平均导入速度约为 5M/s -+ Step1: 判断是否要修改 Insert into 的默认超时时间 ++ Step1: 判断是否要修改 Insert Into 的默认超时时间 - ``` - 计算导入的大概时间 - 10G / 5M/s = 2000s - - 修改 FE 配置 - insert_load_default_timeout_second = 2000 - ``` - + ``` + 计算导入的大概时间 + 10G / 5M/s = 2000s + + 修改 FE 配置 + insert_load_default_timeout_second = 2000 + ``` + + Step2:创建导入任务 - 由于用户是希望将一张表中的数据做 ETL 并导入到目标表中,所以应该使用 Insert into query\_stmt 方式导入。 + 由于用户是希望将一张表中的数据做 ETL 并导入到目标表中,所以应该使用 Insert into query\_stmt 方式导入。 - ``` - insert into bj_store_sales select id, total, user_id, sale_timestamp from store_sales where region = "bj"; - ``` + ``` + INSERT INTO bj_store_sales SELECT id, total, user_id, sale_timestamp FROM store_sales where region = "bj"; + ``` +## 常见问题 +* 查看错误行 + 由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。但无论以上哪种情况,Doris 目前无法提供查看不合格数据行的功能。因此用户无法通过 Insert Into 语句来查看具体的导入错误。 + + 错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。当依然无法检查出问题时。目前只能建议先运行 Insert Into 语句中的 SELECT 命令将数据导出到一个文件中,然后在通过 Stream load 的方式导入这个文件,来查看具体的错误。 diff --git a/docs/documentation/cn/administrator-guide/load-data/load-manual.md b/docs/documentation/cn/administrator-guide/load-data/load-manual.md index 8823d72051..6d8bff4eef 100644 --- a/docs/documentation/cn/administrator-guide/load-data/load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/load-manual.md @@ -1,176 +1,171 @@ -# 导入 +# 导入总览 + 导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过 Mysql 客户端查询数据。 -本文档主要总体介绍导入简单原理,目前支持的几种导入方式,以及最佳实践。具体各个导入的操作手册请参阅各个导入的文档。 +Doris 支持多种导入方式。建议先完整阅读本文档,再根据所选择的导入方式,查看各自导入方式的详细文档。 -# 基本概念 +## 基本概念 -1. 导入任务(Load job):导入任务读取用户提交的源数据,转换或清洗后,将数据导入到 Doris 系统中。导入完成后,数据即可被用户查询到。 +1. Frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入规划生成和导入任务的调度工作。 2. Backend(BE):Doris 系统的计算和存储节点。在导入流程中主要负责数据的 ETL 和存储。 -3. Frontend(FE):Doris 系统的元数据和调度节点。在导入流程中主要负责导入规划生成和导入任务的调度工作。 -4. Palo:Doris 系统在百度云上企业版售卖的名称。 -5. HDFS: Hadoop 分布式文件系统。用户的待导入数据可能存放的位置。 -6. BOS:百度云提供的对象存储系统,百度云上的 Palo 用户的待导入数据可能存放的位置。 -7. AFS:百度内部提供的超大规模文件系统,百度厂内的 Palo 用户的待导入数据可能存放的位置。 -8. Database(DB):Doris 系统中的 database。 +3. Broker:Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统中文件的能力。 +4. 导入作业(Load job):导入作业读取用户提交的源数据,转换或清洗后,将数据导入到 Doris 系统中。导入完成后,数据即可被用户查询到。 +5. Label:所有导入作业都有一个 Label。Label 在一个数据库内唯一,可由用户指定或系统自动生成,用于标识一个导入作业。相同的 Label 仅可用于一个成功的导入作业。 +6. MySQL 协议/HTTP 协议:Doris 提供两种访问协议接口。 MySQL 协议和 HTTP 协议。部分导入方式使用 MySQL 协议接口提交作业,部分导入方式使用 HTTP 协议接口提交作业。 -# 基本原理 -## 导入执行流程 +## 导入方式 + +为适配不同的数据导入需求,Doris 系统提供了5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。 + +所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 数据格式。 + +每个导入方式的说明请参阅单个导入方式的操作手册。 + +* Broker load + + 通过 Broker 进程访问并读取外部数据源(如 HDFS)导入到 Doris。用户通过 Mysql 协议提交导入作业后,异步执行。通过 `SHOW LOAD` 命令查看导入结果。 + +* Stream load + + 用户通过 HTTP 协议提交请求并携带原始数据创建导入。主要用于快速将本地文件或数据流中的数据导入到 Doris。导入命令同步返回导入结果。 + +* Insert + + 类似 MySQL 中的 Insert 语句,Doris 提供 `INSERT INTO tbl SELECT ...;` 的方式从 Doris 的表中读取数据并导入到另一张表。或者通过 `INSERT INTO tbl VALUES(...);` 插入单条数据。 + +* Multi load + + 用户通过 HTTP 协议提交多个导入作业。Multi Load 可以保证多个导入作业的原子生效。 + +* Routine load + + 用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如 Kafka)中读取数据并导入到 Doris 中。 + +## 基本原理 + +### 导入执行流程 ``` +---------+ +---------+ +----------+ +-----------+ | | | | | | | | -|PENDING +----->+ETL +----->+LOADING +----->+FINISHED | +| PENDING +----->+ ETL +----->+ LOADING +----->+ FINISHED | | | | | | | | | +---------+ +---+-----+ +----+-----+ +-----------+ - | | - | | - | | - | | +-----------+ - | | | | - +-----------------+------------>CANCELLED | + | | | + | | | + | | | + | | | +-----------+ + | | | | | + +---------------+-----------------+------------> CANCELLED | | | +-----------+ ``` -如上图,导入流程主要经过上面4个阶段。其中 PENDING 和 ETL 阶段不是必须的阶段。 +如上图,一个导入作业主要经过上面4个阶段。 -+ PENDING: 该阶段只有 Broker load 才有。Broker load 被用户提交后会短暂停留在这个阶段,直到被 FE 中的 Scheduler 调度。 其中 Scheduler 的调度间隔为5秒。 ++ PENDING(非必须): 该阶段只有 Broker Load 才有。Broker Load 被用户提交后会短暂停留在这个阶段,直到被 FE 中的 Scheduler 调度。 其中 Scheduler 的调度间隔为5秒。 -+ ETL: 该阶段在版本0.11.0(包含)之前存在,主要是用于将原始数据按照用户声明的 transform 方式进行变换,并且过滤不满足条件的原始数据。在 0.11.0 后的版本,ETL 阶段不再存在,其中数据 transform 的工作被合并到 LOADING 阶段。 - - -+ LOADING: 该阶段在版本0.11.0(包含)之前主要用于将变换后的数据推到对应的 BE 存储中。在0.11.0后的版本,该阶段先对数据进行 ETL 清洗和变换,然后将数据发送到 BE 存储中。当所有导入数据均完成导入后,进入等待生效过程,此时 Load job 依旧是 LOADING。 ++ ETL(非必须): 该阶段在版本 0.10.0(包含) 之前存在,主要是用于将原始数据按照用户声明的方式进行变换,并且过滤不满足条件的原始数据。在 0.10.0 后的版本,ETL 阶段不再存在,其中数据 transform 的工作被合并到 LOADING 阶段。 ++ LOADING: 该阶段在版本 0.10.0(包含)之前主要用于将变换后的数据推到对应的 BE 存储中。在 0.10.0 后的版本,该阶段先对数据进行清洗和变换,然后将数据发送到 BE 存储中。当所有导入数据均完成导入后,进入等待生效过程,此时 Load job 依旧是 LOADING。 + FINISHED: 在 Load job 涉及的所有数据均生效后,Load job 的状态变成 FINISHED。FINISHED 后导入的数据均可查询。 - -+ CANCELLED: 在 ETL 或者 LOADING 的过程中,如果发生数据质量不合格等问题,Load Job 会被系统 cancel,本次导入会全部失败。当然,用户也可以手动取消 Load Job。CANCELLED 也是 Load Job 的最终状态,不可被再次执行。 - ++ CANCELLED: 在作业 FINISH 的之前,作业都可能被取消并进入 CANCELLED 状态。如用户手动取消,或导入出现错误等。CANCELLED 也是 Load Job 的最终状态,不可被再次执行。 上述阶段,除了 PENDING 到 LOADING 阶段是 Scheduler 轮训调度的,其他阶段之前的转移都是回调机制实现。 -# 导入方式 +### Label 和 原子性 -为适配不同的数据导入需求, Doris 系统提供了5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 数据格式。 +Doris 对所有导入方式提供原子性保证。既保证同一个导入作业内的数据,原子生效。不会出现仅导入部分数据的情况。 -每个导入方式的说明见单个导入方式的操作手册。此处只简单介绍并讲解如何选择。 +同时,每一个导入作业都有一个由用户指定或者系统自动生成的 Label。Label 在一个 Database 内唯一。当一个 Label 对应的导入作业成功够,不可在重复使用该 Label 提交导入作业。如果 Label 对应的导入作业失败,则可以重复使用。 -1. Broker load: 是一种异步类型的导入方式,主要支持数据源有 HDFS, BOS, AFS 这三种。用户可通过 mysql client 创建导入。 -2. Stream load:是一种同步类型的导入方式,用户提交 HTTP 请求并携带原始数据创建导入。 - + Mini load:Mini load 和 Stream load 一样,但从功能上并不支持列的函数变换等。建议直接使用 Stream load 即可。 -3. Insert into: 是一种同步类型的导入方式,用户可以指定将已经存在在 Doris 系统中的数据导入到另一个 Table 中。用户可通过 Mysql client 创建导入。 -4. Multi load:是一种异步类型的导入方式,在导入创建后可接收多次 Mini load 导入,并且原子性生效同一个 Multi load中的导入数据。 -5. Routine load:流式导入方式,用户指定流式数据源后,Doris 会循环不断从数据源中读取新数据并导入。目前支持的流式数据源为 Kafka。 +用户可以通过 Label 机制,来保证 Label 对应的数据最多被导入一次,级 At-Most-Once 语义。 -## 如何选择合适的导入方式 -第一原则:根据数据源所在位置选择导入方式。比如:如果用户的原始数据存放在 HDFS 上,则应该选择 Broker load 导入方式。 -# 基本操作 -这里主要介绍了导入的几种基本操作,以及基本操作的使用流程。用户在确定了导入方式后,可查看各个导入方式的操作手册获取更详细的操作参数意义。 +## 同步和异步 -## 创建导入 -用户提交导入任务,并且设置导入任务的参数。用户提交导入任务后,根据不同的导入方式,导入任务会被异步或同步的执行。 - -同步执行的导入比如:Stream load,任务会直接进入LOADING阶段。异步执行的导入方式比如: Broker load,任务会先进入PENDING阶段。(在导入操作使用流程中会详细解释同步和异步的区别) - -## 查看导入 -用户提交成功导入后,可通过查看导入命令获取导入状态,以及详细信息。对于异步的导入方式来说,用户必须在提交导入后,异步查看导入进度。在 SHOW LOAD 中导入状态改为 FINISHED 时,导入才算完成。用户也可以通过该方式查看导入失败的原因,以及导入的各项指标。 - -执行 ``` HELP SHOW LOAD``` 可查看查看导入的语法帮助。 - -## 取消导入 -用户可以通过指定待取消的任务任务的 Label,来手动取消导入任务。导入任务被取消后,导入任务不会导入任何数据。只有未完成的导入任务才能被手动取消。 - -执行 ```HELP CANCEL LOAD``` 查看取消导入的语法。 - -一般情况下,提交的导入配置错误或者导入消耗时间过长,都可以使用这个命令取消导入。 - -## 导入操作使用流程 Doris 目前的导入方式分为两类,同步和异步。如果是外部程序接入 Doris 的导入功能,需要判断使用导入方式是哪类再确定接入逻辑。 ### 同步 + 同步导入方式既用户创建导入任务,Doris 同步执行导入,执行完成后返回用户导入结果。用户可直接根据创建导入任务命令返回的结果同步判断导入是否成功。 -同步类型的导入方式有: Stream load , Mini load, Insert into。 +同步类型的导入方式有: **Stream load**,**Insert**。 操作步骤: -1. 用户(外部系统)创建导入任务 -2. Doris 返回导入结果 +1. 用户(外部系统)创建导入任务。 +2. Doris 返回导入结果。 3. 用户(外部系统)判断导入结果,如果失败可以再次提交导入任务。 *注意:如果用户使用的导入方式是同步返回的,且导入的数据量过大,则创建导入请求可能会花很长时间才能返回结果。* ### 异步 -异步导入方式既用户创建导入任务后,Doris 直接返回创建成功。导入任务会被异步执行,用户需要再发查看导入请求来确定导入是否完成。 +异步导入方式既用户创建导入任务后,Doris 直接返回创建成功。**创建成功不代表数据已经导入**。导入任务会被异步执行,用户在创建成功后,需要通过轮询的方式发送查看命令查看导入作业的状态。如果创建失败,则可以根据失败信息,判断是否需要再次创建。 -异步类型的导入方式有:Broker load,Multi load。 +异步类型的导入方式有:**Broker load**,**Multi load**。 操作步骤: -1. 用户(外部系统)创建导入任务 -2. Doris 返回导入创建结果 +1. 用户(外部系统)创建导入任务。 +2. Doris 返回导入创建结果。 3. 用户(外部系统)判断导入创建结果,成功则进入4,失败回到重试创建导入,回到1。 -4. 用户(外部系统)轮训查看导入任务,直到 state 变为 FINISHED 或 CANCELLED。 - -**强调:异步类型的导入,必须通过异步查看导入任务来确定导入是否成功。仅仅创建导入成功,并不代表导入完成** +4. 用户(外部系统)轮询查看导入任务,直到状态变为 FINISHED 或 CANCELLED。 ### 注意事项 无论是异步还是同步的导入类型,都不应该在 Doris 返回导入失败或导入创建失败后,无休止的重试。**外部系统在有限次数重试并失败后,保留失败信息,大部分多次重试均失败问题都是使用方法问题或数据本身问题。** -# 最佳实践 +## 最佳实践 + 用户在接入 Doris 导入时,一般会采用程序接入的方式,这样可以保证数据被定期的导入到 Doris 中。下面主要说明了程序接入 Doris 的最佳实践。 -1. 选择合适的导入方式:根据数据源所在位置选择导入方式。例如:如果原始数据存放在 BOS 或 HDFS 上,则使用 Broker load导入。 -2. 确定导入方式的协议:如果选择了 Broker load 导入方式,则外部系统需要能使用 Mysql 协议定期提交任务。 -3. 确定导入方式的类型:导入方式为同步或异步。比如 Broker load 为异步导入方式,则外部系统在提交创建导入后,必须调用 查看导入命令,根据查看导入命令的结果来判断导入是否成功。 -4. 制定 label 生成策略:label 生成策略需满足,每一批次数据唯一且固定的原则。这样 Doris 就可以保证 at most once。 -5. 程序自身保证 at least once:外部系统需要保证自身的 at least once,这样就可以导入流程的 exactly once。 +1. 选择合适的导入方式:根据数据源所在位置选择导入方式。例如:如果原始数据存放在 HDFS 上,则使用 Broker load 导入。 +2. 确定导入方式的协议:如果选择了 Broker load 导入方式,则外部系统需要能使用 MySQL 协议定期提交和查看导入作业。 +3. 确定导入方式的类型:导入方式为同步或异步。比如 Broker load 为异步导入方式,则外部系统在提交创建导入后,必须调用查看导入命令,根据查看导入命令的结果来判断导入是否成功。 +4. 制定 Label 生成策略:Label 生成策略需满足,每一批次数据唯一且固定的原则。这样 Doris 就可以保证 At-Most-Once。 +5. 程序自身保证 At-Least-Once:外部系统需要保证自身的 At-Least-Once,这样就可以保证导入流程的 Exactly-Once。 + +## 通用系统配置 -# 导入通用系统配置 下面主要解释了几个所有导入方式均通用的系统级别的配置。 -## FE conf 中的配置 -下面两个配置属于 FE 的系统配置,可以通过修改 FE 的配置文件 ```fe.conf``` 来修改配置。 +### FE 配置 + +以下配置属于 FE 的系统配置,可以通过修改 FE 的配置文件 ```fe.conf``` 来修改配置。 + max\_load\_timeout\_second 和 min\_load\_timeout\_second - - 这两个配置含义为:最大的导入超时时间,最小的导入超时时间,以秒为单位。默认的最大超时时间为3天, 默认的最小超时时间为1秒。用户自定义的导入超时时间不可超过这个范围。该参数通用于所有的导入方式。 - - 该配置对应的 ```fe.conf```中的参数名为 ```max_load_timeout_second, min_load_timeout_second ```。 - + + 这两个配置含义为:最大的导入超时时间,最小的导入超时时间,以秒为单位。默认的最大超时时间为3天, 默认的最小超时时间为1秒。用户自定义的导入超时时间不可超过这个范围。该参数通用于所有的导入方式。 + + desired\_max\_waiting\_jobs - 在等待队列中的导入任务个数最大值,默认为100。当在 FE 中处于 PENDING 状态(也就是等待执行的)导入个数超过该值,新的导入请求则会被拒绝。 - - 此配置仅对异步执行的导入有效,当异步执行的导入等待个数超过默认值,则会在创建导入请求的时候被拒绝。 - - 该配置对应的 ```fe.conf```中的参数名为 ```desired_max_waiting_jobs```。 - + 在等待队列中的导入任务个数最大值,默认为100。当在 FE 中处于 PENDING 状态(也就是等待执行的)导入个数超过该值,新的导入请求则会被拒绝。 + + 此配置仅对异步执行的导入有效,当异步执行的导入等待个数超过默认值,则后续的创建导入请求会被拒绝。 + + max\_running\_txn\_num\_per\_db - 这个配置的含义是说,每个 DB 中正在运行的导入最大个数(不区分导入类型,统一计数)。当当前 DB 正在运行的导入个数超过最大值时,后续的导入则会直接被系统取消。 - - 对于同步执行的导入来说,如果当前 DB 的正在运行导入个数超过最大值,则创建导入请求就会返回失败。 - - 对于异步执行的导入来说,如果当前 DB 的正在运行导入个数超过最大值,则会在导入被正式调度到(也就是从 PENDING 状态准备改为 LOADING 状态)的时候被系统异步取消。 - - 该配置对应的是 ```fe.conf``` 中的参数名为 ``` max_running_txn_num_per_db ```。 + 这个配置的含义是说,每个 Database 中正在运行的导入最大个数(不区分导入类型,统一计数)。当当前 Database 正在运行的导入个数超过最大值时,后续的导入不会被执行。如果是同步导入作业,则导入会被拒绝。如果是异步导入作业。则作业会在队列中等待。 + +### BE 配置 +以下配置属于 BE 的系统配置,可以通过修改 BE 的配置文件 ```be.conf``` 来修改配置。 ++ push\_write\_mbytes\_per\_sec + BE 上单个 Tablet 的写入速度限制。默认是 10,即 10MB/s。通常 BE 对单个 Tablet 的最大写入速度,根据 Schema 以及系统的不同,大约在 10-30MB/s 之间。可以适当调整这个参数来控制导入速度。 + ++ write\_buffer\_size + 导入数据在 BE 上会先写入一个 memtable,memtable 达到阈值后才会写回磁盘。默认大小是 100MB。过小的阈值可能导致 BE 上存在大量的小文件。可以适当提高这个阈值减少文件数量。但过大的阈值可能导致 RPC 超时,见下面的配置说明。 + ++ tablet\_writer\_rpc\_timeout\_sec + 导入过程中,发送一个 Batch(1024行)的 RPC 超时时间。默认 600 秒。因为该 RPC 可能涉及多个 memtable 的写盘操作,所以可能会因为写盘导致 RPC 超时,可以适当调整这个超时时间来减少超时错误(如 `send batch fail` 错误)。同时,如果调大 `write_buffer_size` 配置,也需要适当调大这个参数。 - - - - - - - - ++ streaming\_load\_rpc\_max\_alive\_time\_sec + 在导入过程中,Doris 会为每一个 Tablet 开启一个 Writer,用于接收数据并写入。这个参数指定了 Writer 的等待超时时间。如果在这个时间内,Writer 没有收到任何数据,则 Writer 会被自动销毁。当系统处理速度较慢时,Writer 可能长时间接收不到下一批数据,导致导入报错:`TabletWriter add batch with unknown id`。此时可适当增大这个配置。默认为 600 秒。 diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index 2642285e10..f25b2477d4 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -1,4 +1,4 @@ -# 例行导入 +# Routine Load 例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。 diff --git a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md index 33fa4818be..444fdace32 100644 --- a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md @@ -1,224 +1,262 @@ # Stream load -Stream load 是一个同步的导入方式,用户通过发送 HTTP 请求将本地或内存中文件导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。 -Stream load 主要适用于待导入文件在发送导入请求端,或可被发送导入请求端获取的情况。 +Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。 -# 基本原理 -Stream load 是单机版本的导入,整个导入过程中参与 ETL 的节点只有一个 BE。由于导入后端是流式的所以并不会受单个 BE 的内存限制影响。 +Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。 + +## 基本原理 下图展示了 Stream load 的主要流程,省略了一些导入细节。 ``` -^ + -| | -| | user creates stream load -| 5. BE returns | -| the result of v -| stream load +---+-----+ -| | | -| |FE | -| | | -| ++-+------+ -| 1.FE redirects| ^ 2. BE fetches the plan -| request to be | | of stream load -| | | ^ -| | | | 4. BE wants to effect -| v | | the load data -| ++-+---+--+ -| | +------+ -+--------------+BE | | 3. BE executes plan - | +<-----+ - +---------+ + ^ + + | | + | | 1A. User submit load to FE + | | + | +--v-----------+ + | | FE | +5. Return result to user | +--+-----------+ + | | + | | 2. Redirect to BE + | | + | +--v-----------+ + +---+Coordinator BE| 1B. User submit load to BE + +-+-----+----+-+ + | | | + +-----+ | +-----+ + | | | 3. Distrbute data + | | | + +-v-+ +-v-+ +-v-+ + |BE | |BE | |BE | + +---+ +---+ +---+ ``` -# 基本操作 -## 创建导入 -Stream load 创建导入语句 +Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。 + +用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。 + +导入的最终结果由 Coordinator BE 返回给用户。 + +## 基本操作 +### 创建导入 + +Stream load 通过 HTTP 协议提交和传输数据。这里通过 `curl` 命令展示如何提交导入。 + +用户也可以通过其他 HTTP client 进行操作。 ``` -语法: curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load Header 中支持如下属性: label, column_separator, columns, where, max_filter_ratio, partitions 格式为: -H "key1:value1" +``` 示例: -curl --location-trusted -u root -T date -H "label:123" http://abc.com:8888/api/test/date/_stream_load - ``` -创建导入的详细语法帮助执行 ``` HELP STREAM LOAD;``` 查看, 下面主要介绍创建 Stream load 的部分参数意义。 +curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load +``` +创建导入的详细语法帮助执行 ```HELP STREAM LOAD``` 查看, 下面主要介绍创建 Stream load 的部分参数意义。 -### 签名参数 +#### 签名参数 + user/passwd - Stream load 由于创建导入的协议使用的是 HTTP 协议,所以需要通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。 - -### 导入任务参数 + Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。 + +#### 导入任务参数 Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。 + label - 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。 - - label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的label。这样同一批次数据的重复请求只会被接受一次,保证了 At most once** - - 当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。 - + 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。 + + label 的另一个作用,是防止用户重复导入相同的数据。**强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once** + + 当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。 + + max\_filter\_ratio - 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的 filter ratio 超过该值,则导入失败。计算公式为: ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )> max_filter_ratio ``` - - ``` dpp.abnorm.ALL ``` 在代码中也叫 ``` num_rows_filtered``` 指的是导入过程中被过滤的错误数据。比如:列在表中为非空列,但是导入数据为空则为错误数据。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的错误数据量。 - - ``` dpp.norm.ALL ``` 指的是导入过程中正确数据的条数。可以通过 ``` SHOW LOAD ``` 命令查询导入任务的正确数据量。 - - ``` num_rows_unselected ``` 导入过程中被 where 条件过滤掉的数据量。过滤的数据量不参与容忍率的计算。 - - ``` 原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL + num_rows_unselected ``` - + 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。 + + 如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 + + 计算公式为: + + ``` (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio ``` + + ```dpp.abnorm.ALL``` 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。 + + ```dpp.norm.ALL``` 指的是导入过程中正确数据的条数。可以通过 ```SHOW LOAD``` 命令查询导入任务的正确数据量。 + + 原始文件的行数 = `dpp.abnorm.ALL + dpp.norm.ALL` + + where - 导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参数 filter ratio 的计算,但会被计入``` num_rows_unselected ```。 - + 导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入```num_rows_unselected```。 + + partition - 待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 ```dpp.abnorm.ALL ``` - + 待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 ```dpp.abnorm.ALL ``` + + columns - 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 - - ``` - 列顺序变换例子:原始数据有两列,目前表也有两列(c1,c2)但是原始文件的第一列对应的是目标表的c2列, 而原始文件的第二列对应的是目标表的c1列,则写法如下: - columns: c2,c1 - - 表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下: - columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2) - 其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。 - - ``` - -### 创建导入操作返回结果 + 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 + + ``` + 列顺序变换例子:原始数据有两列,目前表也有两列(c1,c2)但是原始文件的第一列对应的是目标表的c2列, 而原始文件的第二列对应的是目标表的c1列,则写法如下: + columns: c2,c1 + + 表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下: + columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2) + 其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。 + ``` + +### 返回结果 由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。 +示例: + +``` +{ + "TxnId": 1003, + "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 1000000, + "NumberLoadedRows": 1000000, + "NumberFilteredRows": 1, + "NumberUnselectedRows": 0, + "LoadBytes": 40888898, + "LoadTimeMs": 2144, + "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" +} +``` + 下面主要解释了 Stream load 导入结果参数: -+ NumberLoadedRows ++ TxnId:导入的事务ID。用户可不感知。 - 和```dpp.norm.ALL ``` 含义相同,显示的是本次导入成功的条数。 - -+ NumberFilteredRows ++ Label:导入 Label。由用户指定或系统自动生成。 + ++ Status:导入完成状态。 + + "Success":表示导入成功。 + + "Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。 + + "Label Already Exists":Label 重复,需更换 Label。 + + "Fail":导入失败。 - 和 ```dpp.abnorm.ALL ``` 含义相同,显示的是本次导入被过滤的错误条数。 - -+ NumberUnselectedRows ++ Message:导入错误信息。 - 显示的是本地导入被 where 条件过滤的条数。 ++ NumberTotalRows:导入总处理的行数。 ++ NumberLoadedRows:成功导入的行数。 -*注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。 ++ NumberFilteredRows:数据质量不合格的行数。 + ++ NumberUnselectedRows:被 where 条件过滤的行数。 + ++ LoadBytes:导入的字节数。 + ++ LoadTimeMs:导入完成时间。单位毫秒。 + ++ ErrorURL:如果有数据质量问题,通过访问这个 URL 查看具体错误行。 + +> 注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。 + +### 取消导入 -## 取消导入 用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。 -# Stream load 导入系统配置 -## FE conf 中的配置 +## 相关系统配置 -+ timeout +### FE 配置 - 导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。 - - 目前 Stream load 并不支持自定义导入的 timeout 时间,所有 Stream load 导入的超时时间是统一的,默认的 timeout 时间为300秒。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```stream_load_default_timeout_second```。 - -## BE conf 中的配置 ++ stream\_load\_default\_timeout\_second + + 导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。 + + 目前 Stream load 并不支持自定义导入的 timeout 时间,所有 Stream load 导入的超时时间是统一的,默认的 timeout 时间为300秒。如果导入的源文件无法再规定时间内完成导入,则需要调整 FE 的参数```stream_load_default_timeout_second```。 + +### BE 配置 + streaming\_load\_max\_mb - Stream load 的最大导入大小,默认为 10G,单位是 Mb。如果用户的原始文件超过这个值,则需要调整 BE 的参数 ```streaming_load_max_mb```。 - -# Stream load 最佳实践 -## 应用场景 -使用 stream load 的最合适场景就是原始文件在内存中,或者在磁盘中。其次,由于 Stream load 是一种同步的导入方式,所以用户如果希望用同步方式获取导入结果,也可以使用这种导入。 + Stream load 的最大导入大小,默认为 10G,单位是 MB。如果用户的原始文件超过这个值,则需要调整 BE 的参数 ```streaming_load_max_mb```。 + +## 最佳实践 -## 数据量 -由于 Stream load 的原理是由 BE 发起的导入,所以 Stream load 都是单机执行的,导入数据量在1 Byte ~ 20G 均可。由于默认的最大 Stream load 导入数据量为 10G,所以如果要导入超过 10G 的文件需要修改 BE 的配置 ```streaming_load_max_mb``` +### 应用场景 + +使用 Stream load 的最合适场景就是原始文件在内存中,或者在磁盘中。其次,由于 Stream load 是一种同步的导入方式,所以用户如果希望用同步方式获取导入结果,也可以使用这种导入。 + +### 数据量 + +由于 Stream load 的原理是由 BE 发起的导入并分发数据,建议的导入数据量在 1G 到 10G 之间。由于默认的最大 Stream load 导入数据量为 10G,所以如果要导入超过 10G 的文件需要修改 BE 的配置 ```streaming_load_max_mb``` ``` 比如:待导入文件大小为15G 修改 BE 配置 streaming_load_max_mb 为 16000 即可。 ``` -+ 数据量过大导致超时 - - Stream load 的默认超时为 300秒,按照 Doris 目前最大的导入限速来看,约超过 3G 的文件就需要修改导入任务默认超时时间了。 - - ``` - 导入任务超时时间 = 导入数据量 / 10M/s (具体的平均导入速度需要用户根据自己的集群情况计算) - 例如:导入一个 10G 的文件 - timeout = 1000s 等于 10G / 10M/s - ``` - -## 完整例子 +Stream load 的默认超时为 300秒,按照 Doris 目前最大的导入限速来看,约超过 3G 的文件就需要修改导入任务默认超时时间了。 + +``` +导入任务超时时间 = 导入数据量 / 10M/s (具体的平均导入速度需要用户根据自己的集群情况计算) +例如:导入一个 10G 的文件 +timeout = 1000s 等于 10G / 10M/s +``` + +### 完整例子 数据情况: 数据在发送导入请求端的本地磁盘路径 /home/store_sales 中,导入的数据量约为 15G,希望导入到数据库 bj_sales 的表 store_sales 中。 集群情况:Stream load 的并发数不受集群大小影响。 + step1: 导入文件大小是否超过默认的最大导入大小10G - ``` - 修改 BE conf - streaming_load_max_mb = 16000 - ``` + ``` + 修改 BE conf + streaming_load_max_mb = 16000 + ``` + step2: 计算大概的导入时间是否超过默认 timeout 值 - ``` - 导入时间 ≈ 15000 / 10 = 1500s - 超过了默认的 timeout 时间,需要修改 FE 的配置 - stream_load_default_timeout_second = 1500 - - ``` - + ``` + 导入时间 ≈ 15000 / 10 = 1500s + 超过了默认的 timeout 时间,需要修改 FE 的配置 + stream_load_default_timeout_second = 1500 + ``` + + step3:创建导入任务 - ``` - curl --location-trusted -u root:password -T /home/store_sales -H "label:abc" http://abc.com:8000/api/bj_sales/store_sales/_stream_load - ``` + ``` + curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8000/api/bj_sales/store_sales/_stream_load + ``` +## 常见问题 -# 常见问题 +* Label Already Exists -## Label Already Exists + Stream load 的 Label 重复排查步骤如下: -Stream load 的 Label 重复排查步骤如下: + 1. 是否和其他导入方式已经存在的导入 Label 冲突: -1. 是否和其他导入方式已经存在的导入 Label 冲突: - - 由于 Doris 系统中导入的 Label 不区分导入方式,所以存在其他导入方式使用了相同 Label 的问题。 - - 通过 ``` SHOW LOAD WHERE LABEL = “xxx”```,其中 xxx 为重复的 Label 字符串,查看是否已经存在一个 FINISHED 导入的 Label 和用户申请创建的 Label 相同。 - -2. 是否 Stream load 同一个创建任务被重复提交了 - - 由于 Stream load 是 HTTP 协议提交创建导入任务,一般各个语言的 HTTP Client 均会自带请求重试逻辑。Doris 系统在接受到第一个请求后,已经开始操作 Stream load,但是由于没有及时返回给 Client 端结果, Client 端会发生再次重试创建请求的情况。这时候 Doris 系统由于已经在操作第一个请求,所以第二个请求已经就会被报 Label Already Exists 的情况。 - - 排查上述可能的方法:使用 Label 搜索 FE Master 的日志,看是否存在同一个 Label 出现了两次 ```redirect load action to destination= ``` 的情况。如果有就说明,请求被 Client 端重复提交了。 - - 改进方法: - + 建议用户根据当前请求的数据量,计算出大致导入的时间,并根据导入超时时间改大 Client 端的请求超时时间,避免请求被 Client 端多次提交。 - -# Mini load - -Mini load 支持的数据源,导入方式以及支持的 ETL 功能是 Stream load 的子集,且导入的效率和 Stream load 一致。**强烈建议使用 Stream load 替代 Mini load** - -唯一不同的是,Mini load 由于为了兼容旧版本的使用习惯,用户不仅可以根据创建请求的返回值查看导入状态,也可以异步的通过查看导入命令查看导入状态。 - -Mini load的创建语法帮助可执行 ``` HELP MINI LOAD```。 + 由于 Doris 系统中导入的 Label 不区分导入方式,所以存在其他导入方式使用了相同 Label 的问题。 + + 通过 ```SHOW LOAD WHERE LABEL = “xxx”```,其中 xxx 为重复的 Label 字符串,查看是否已经存在一个 FINISHED 导入的 Label 和用户申请创建的 Label 相同。 + + 2. 是否 Stream load 同一个作业被重复提交了 + 由于 Stream load 是 HTTP 协议提交创建导入任务,一般各个语言的 HTTP Client 均会自带请求重试逻辑。Doris 系统在接受到第一个请求后,已经开始操作 Stream load,但是由于没有及时返回给 Client 端结果, Client 端会发生再次重试创建请求的情况。这时候 Doris 系统由于已经在操作第一个请求,所以第二个请求已经就会被报 Label Already Exists 的情况。 + + 排查上述可能的方法:使用 Label 搜索 FE Master 的日志,看是否存在同一个 Label 出现了两次 ```redirect load action to destination= ``` 的情况。如果有就说明,请求被 Client 端重复提交了。 + + 建议用户根据当前请求的数据量,计算出大致导入的时间,并根据导入超时时间改大 Client 端的请求超时时间,避免请求被 Client 端多次提交。 diff --git a/docs/documentation/cn/sql-reference/sql-functions/date-time-functions/timediff.md b/docs/documentation/cn/sql-reference/sql-functions/date-time-functions/timediff.md index 1a5006e488..d45e6b4ef3 100644 --- a/docs/documentation/cn/sql-reference/sql-functions/date-time-functions/timediff.md +++ b/docs/documentation/cn/sql-reference/sql-functions/date-time-functions/timediff.md @@ -6,11 +6,11 @@ ## Description -TIMEDIFFDATETIME֮IJֵ +TIMEDIFF返回两个DATETIME之间的差值 -TIMEDIFFرʾΪʱֵexpr1 - expr2ĽֵΪTIME +TIMEDIFF函数返回表示为时间值的expr1 - expr2的结果,返回值为TIME类型 -ڴ-838:59:59838:59:59֮TIMEֵΧ +其结果被限制在从-838:59:59到838:59:59之间的TIME值范围内 ## Examples @@ -43,4 +43,3 @@ mysql> SELECT TIMEDIFF('2019-01-01 00:00:00', NULL); | NULL | +---------------------------------------+ ``` - diff --git a/docs/documentation/cn/sql-reference/sql-functions/string-functions/split_part.md b/docs/documentation/cn/sql-reference/sql-functions/string-functions/split_part.md index 2564a57855..142a567c41 100644 --- a/docs/documentation/cn/sql-reference/sql-functions/string-functions/split_part.md +++ b/docs/documentation/cn/sql-reference/sql-functions/string-functions/split_part.md @@ -11,19 +11,19 @@ ## Examples ``` -mysql> select split_part("hello word", " ", 1); +mysql> select split_part("hello world", " ", 1); +----------------------------------+ -| split_part('hello word', ' ', 1) | +| split_part('hello world', ' ', 1) | +----------------------------------+ | hello | +----------------------------------+ -mysql> select split_part("hello word", " ", 2); +mysql> select split_part("hello world", " ", 2); +----------------------------------+ -| split_part('hello word', ' ', 2) | +| split_part('hello world', ' ', 2) | +----------------------------------+ -| word | +| world | +----------------------------------+ mysql> select split_part("2019年7月8号", "月", 1);