From 23535824934ff050060a3a9b1578966a2cc45f4d Mon Sep 17 00:00:00 2001
From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com>
Date: Thu, 19 Oct 2023 15:43:22 +0800
Subject: [PATCH] [enhancement](load) support for broker load, routine load,
mysql load and add docs (#25528)
cases will be added later.
---
.../import/import-way/stream-load-manual.md | 8 ++++++++
.../Load/BROKER-LOAD.md | 8 ++++++++
.../Load/CREATE-ROUTINE-LOAD.md | 6 ++++++
.../Load/MYSQL-LOAD.md | 4 ++++
.../Load/STREAM-LOAD.md | 4 ++++
.../import/import-way/stream-load-manual.md | 8 ++++++++
.../Load/BROKER-LOAD.md | 8 ++++++++
.../Load/CREATE-ROUTINE-LOAD.md | 6 ++++++
.../Load/MYSQL-LOAD.md | 4 ++++
.../Load/STREAM-LOAD.md | 4 ++++
.../doris/analysis/CreateRoutineLoadStmt.java | 20 +++++++++++++++++++
.../doris/analysis/DataDescription.java | 19 +++++++++++++++++-
.../org/apache/doris/analysis/LoadStmt.java | 4 ++++
.../doris/load/loadv2/MysqlLoadManager.java | 12 +++++++++++
.../load/routineload/RoutineLoadJob.java | 7 ++++++-
15 files changed, 120 insertions(+), 2 deletions(-)
diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
index a6c5683615..4c7d4524b1 100644
--- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
@@ -182,6 +182,14 @@ Stream load uses HTTP protocol, so all parameters related to import tasks are se
Stream load import can enable two-stage transaction commit mode: in the stream load process, the data is written and the information is returned to the user. At this time, the data is invisible and the transaction status is `PRECOMMITTED`. After the user manually triggers the commit operation, the data is visible.
++ enclose
+
+ When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
++ escape
+
+ Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'".
+
Example:
1. Initiate a stream load pre-commit operation
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 2136ad22f7..4869c988bd 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -204,6 +204,14 @@ WITH BROKER broker_name
Set the priority of the load job, there are three options: `HIGH/NORMAL/LOW`, use `NORMAL` priority as default. The pending broker load jobs which have higher priority will be chosen to execute earlier.
+ - enclose
+
+ When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
+ - escape
+
+ Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'".
+
- comment
Specify the comment for the import job. The comment can be viewed in the `show load` statement.
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index f9484d63c5..86e9b246f5 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -258,6 +258,12 @@ FROM data_source [data_source_properties]
The sampling window is `max_batch_rows * 10`. That is, if the number of error lines / total lines is greater than `max_filter_ratio` within the sampling window, the routine operation will be suspended, requiring manual intervention to check data quality problems.
Rows that are filtered out by where conditions are not considered error rows.
+
+ 14. `enclose`
+ When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
+ 15. `escape`
+ Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'".
- `FROM data_source [data_source_properties]`
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
index 69c6fe5053..673b21304f 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
@@ -79,6 +79,10 @@ This import method can still guarantee the atomicity of a batch of import tasks,
6. trim_double_quotes: Boolean type, The default value is false. True means that the outermost double quotes of each field in the load file are trimmed.
+7. enclose: When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
+8. escape: Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'".
+
### Example
1. Import the data from the client side local file `testData` into the table `testTbl` in the database `testDb`. Specify a timeout of 100 seconds
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 039462c219..60a9c44fc0 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -156,6 +156,10 @@ separated by commas.
28. comment: String type, the default value is "".
+29. enclose: When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
+30. escape Used to escape characters that appear in a csv field identical to the enclosing characters. For example, if the data is "a,'b,'c'", enclose is "'", and you want "b,'c to be parsed as a field, you need to specify a single-byte escape character, such as "\", and then modify the data to "a,' b,\'c'".
+
### Example
1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds
diff --git a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index 946c057198..162bbe2e60 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -193,6 +193,14 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为`PRECOMMITTED`,用户手动触发commit操作之后,数据才可见。
+- enclose
+
+ 包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。
+
+- escape
+
+ 转义符。用于转义在csv字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
+
示例:
1. 发起stream load预提交操作
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index baea6beabc..d30d25baa7 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -203,6 +203,14 @@ WITH BROKER broker_name
设置导入任务的优先级,可选 `HIGH/NORMAL/LOW` 三种优先级,默认为 `NORMAL`,对于处在 `PENDING` 状态的导入任务,更高优先级的任务将优先被执行进入 `LOADING` 状态。
+ - enclose
+
+ 包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。
+
+ - escape
+
+ 转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
+
- comment
指定导入任务的备注信息。可选参数。
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 4e9b55d49c..a365b26ea3 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -259,6 +259,12 @@ FROM data_source [data_source_properties]
被 where 条件过滤掉的行不算错误行。
+ 14. `enclose`
+ When the csv data field contains row delimiters or column delimiters, to prevent accidental truncation, single-byte characters can be specified as brackets for protection. For example, the column separator is ",", the bracket is "'", and the data is "a,'b,c'", then "b,c" will be parsed as a field.
+
+ 15. `escape`
+ 转义符。用于转义在csv字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
+
- `FROM data_source [data_source_properties]`
数据源的类型。当前支持:
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
index b5d6e852b1..fd77e82457 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md
@@ -78,6 +78,10 @@ INTO TABLE tbl_name
6. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉导入文件每个字段最外层的双引号。
+7. enclose: 包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。
+
+8. escape: 转义符。用于转义在csv字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
+
### Example
1. 将客户端本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表。指定超时时间为 100 秒
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 8ca18f74df..727abb2580 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -152,6 +152,10 @@ curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_h
28. comment: 字符串类型, 默认值为空. 给任务增加额外的信息.
+29. enclose: 包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。
+
+30. escape 转义符。用于转义在字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
+
### Example
1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 94655cbb95..1f49a42995 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -175,6 +175,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean numAsString = false;
private boolean fuzzyParse = false;
+ private String enclose;
+
+ private String escape;
+
/**
* support partial columns load(Only Unique Key Columns)
*/
@@ -301,6 +305,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return jsonPaths;
}
+ public String getEnclose() {
+ return enclose;
+ }
+
+ public String getEscape() {
+ return escape;
+ }
+
public String getJsonRoot() {
return jsonRoot;
}
@@ -484,6 +496,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
+ enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE);
+ if (enclose != null && enclose.length() != 1) {
+ throw new AnalysisException("enclose must be single-char");
+ }
+ escape = jobProperties.get(LoadStmt.KEY_ESCAPE);
+ if (escape != null && escape.length() != 1) {
+ throw new AnalysisException("escape must be single-char");
+ }
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 47e1e66925..9e7e2f656d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -51,6 +51,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -865,7 +866,7 @@ public class DataDescription implements InsertStmt.DataDesc {
return;
}
String columnsSQL = "COLUMNS (" + columnDef + ")";
- SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
+ SqlParser parser = new SqlParser(new org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
@@ -1001,6 +1002,22 @@ public class DataDescription implements InsertStmt.DataDesc {
if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) {
skipLines = Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES));
}
+ if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) {
+ String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE);
+ if (encloseProp.length() == 1) {
+ enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0];
+ } else {
+ throw new AnalysisException("enclose must be single-char");
+ }
+ }
+ if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) {
+ String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE);
+ if (escapeProp.length() == 1) {
+ escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0];
+ } else {
+ throw new AnalysisException("escape must be single-char");
+ }
+ }
}
private void checkLoadPriv(String fullDbName) throws AnalysisException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 87b6a4cedd..ae2a18d47d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -125,6 +125,10 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_COMMENT = "comment";
+ public static final String KEY_ENCLOSE = "enclose";
+
+ public static final String KEY_ESCAPE = "escape";
+
private final LabelName label;
private final List dataDescriptions;
private final BrokerDesc brokerDesc;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index ae1871e0d9..c25699cc63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -372,6 +372,18 @@ public class MysqlLoadManager {
String trimQuotes = props.get(LoadStmt.KEY_TRIM_DOUBLE_QUOTES);
httpPut.addHeader(LoadStmt.KEY_TRIM_DOUBLE_QUOTES, trimQuotes);
}
+
+ // enclose
+ if (props.containsKey(LoadStmt.KEY_ENCLOSE)) {
+ String enclose = props.get(LoadStmt.KEY_ENCLOSE);
+ httpPut.addHeader(LoadStmt.KEY_ENCLOSE, enclose);
+ }
+
+ //escape
+ if (props.containsKey(LoadStmt.KEY_ESCAPE)) {
+ String escape = props.get(LoadStmt.KEY_ESCAPE);
+ httpPut.addHeader(LoadStmt.KEY_ESCAPE, escape);
+ }
}
// skip_lines
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index efa498c56b..9b3ac1b845 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -351,7 +351,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
this.isPartialUpdate = true;
}
jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));
-
if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) {
jobProperties.put(PROPS_FORMAT, "csv");
} else if (stmt.getFormat().equals("json")) {
@@ -385,6 +384,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
} else {
jobProperties.put(PROPS_FUZZY_PARSE, "false");
}
+ if (stmt.getEnclose() != null) {
+ jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose());
+ }
+ if (stmt.getEscape() != null) {
+ jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
+ }
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {