From 59abd65bfc55503e92a33f3ee01cace81966c581 Mon Sep 17 00:00:00 2001 From: "jiafeng.zhang" Date: Wed, 13 Apr 2022 09:47:30 +0800 Subject: [PATCH] add dml load sql help (#8939) add dml load sql help --- .../Load/ALTER-ROUTINE-LOAD.md | 89 ++++ .../Load/BROKER-LOAD.md | 400 ++++++++++++++ .../Load/CANCEL-LOAD.md | 33 +- .../Load/CREATE-ROUTINE-LOAD.md | 485 +++++++++++++++++ .../Load/CREATE-SYNC-JOB.md | 135 +++++ .../Load/PAUSE-ROUTINE-LOAD.md | 22 + .../Load/PAUSE-SYNC-JOB.md | 18 + .../Load/RESUME-ROUTINE-LOAD.md | 22 + .../Load/RESUME-SYNC-JOB.md | 18 + .../Load/STOP-ROUTINE-LOAD.md | 16 + .../Load/STOP-SYNC-JOB.md | 18 + .../Load/STREAM-LOAD.md | 406 +++++++++++++++ .../Load/ALTER-ROUTINE-LOAD.md | 88 ++++ .../Load/BROKER-LOAD.md | 399 ++++++++++++++ .../Load/CANCEL-LOAD.md | 30 ++ .../Load/CREATE-ROUTINE-LOAD.md | 487 ++++++++++++++++++ .../Load/CREATE-SYNC-JOB.md | 135 +++++ .../Load/PAUSE-ROUTINE-LOAD.md | 22 + .../Load/PAUSE-SYNC-JOB.md | 18 + .../Load/RESUME-ROUTINE-LOAD.md | 22 + .../Load/RESUME-SYNC-JOB.md | 18 + .../Load/STOP-ROUTINE-LOAD.md | 16 + .../Load/STOP-SYNC-JOB.md | 18 + .../Load/STREAM-LOAD.md | 380 ++++++++++++++ 24 files changed, 3294 insertions(+), 1 deletion(-) diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md index 8092c7644b..3a9d4d79cf 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md @@ -26,10 +26,99 @@ under the License. ## ALTER-ROUTINE-LOAD +### Name + +ALTER ROUTINE LOAD + ### Description +This syntax is used to modify an already created routine import job. + +Only jobs in the PAUSED state can be modified. + +grammar: + +```sql +ALTER ROUTINE LOAD FOR [db.]job_name +[job_properties] +FROM data_source +[data_source_properties] +```` + +1. `[db.]job_name` + + Specifies the job name to modify. + +2. `tbl_name` + + Specifies the name of the table to be imported. + +3. `job_properties` + + Specifies the job parameters that need to be modified. Currently, only the modification of the following parameters is supported: + + 1. `desired_concurrent_number` + 2. `max_error_number` + 3. `max_batch_interval` + 4. `max_batch_rows` + 5. `max_batch_size` + 6. `jsonpaths` + 7. `json_root` + 8. `strip_outer_array` + 9. `strict_mode` + 10. `timezone` + 11. `num_as_string` + 12. `fuzzy_parse` + + +4. `data_source` + + The type of data source. Currently supports: + + KAFKA + +5. `data_source_properties` + + Relevant properties of the data source. Currently only supports: + + 1. `kafka_partitions` + 2. `kafka_offsets` + 3. `kafka_broker_list` + 4. `kafka_topic` + 5. Custom properties, such as `property.group.id` + + Note: + + 1. `kafka_partitions` and `kafka_offsets` are used to modify the offset of the kafka partition to be consumed, only the currently consumed partition can be modified. Cannot add partition. + ### Example +1. Change `desired_concurrent_number` to 1 + + ```sql + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "1" + ); + ```` + +2. Modify `desired_concurrent_number` to 10, modify the offset of the partition, and modify the group id. + + ```sql + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "10" + ) + FROM kafka + ( + "kafka_partitions" = "0, 1, 2", + "kafka_offsets" = "100, 200, 100", + "property.group.id" = "new_group" + ); + ```` + ### Keywords ALTER, ROUTINE, LOAD diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md index bad06e60a8..8b12356eef 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -26,13 +26,413 @@ under the License. ## BROKER-LOAD +### Name + +BROKER LOAD + ### Description +This command is mainly used to import data on remote storage (such as S3, HDFS) through the Broker service process. + +```sql +LOAD LABEL load_label +( +data_desc1[, data_desc2, ...] +) +WITH BROKER broker_name +[broker_properties] +[load_properties]; +```` + +- `load_label` + + Each import needs to specify a unique Label. You can use this label to view the progress of the job later. + + `[database.]label_name` + +- `data_desc1` + + Used to describe a set of files that need to be imported. + + ```sql + [MERGE|APPEND|DELETE] + DATA INFILE + ( + "file_path1"[, file_path2, ...] + ) + [NEGATIVE] + INTO TABLE `table_name` + [PARTITION (p1, p2, ...)] + [COLUMNS TERMINATED BY "column_separator"] + [FORMAT AS "file_type"] + [(column_list)] + [COLUMNS FROM PATH AS (c1, c2, ...)] + [PRECEDING FILTER predicate] + [SET (column_mapping)] + [WHERE predicate] + [DELETE ON expr] + [ORDER BY source_sequence] + ```` + + - `[MERGE|APPEND|DELETE]` + + Data merge type. The default is APPEND, indicating that this import is a normal append write operation. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the `[DELETE ON]` statement to mark the Delete Flag column. The DELETE type indicates that all data imported this time are deleted data. + + - `DATA INFILE` + + Specify the file path to be imported. Can be multiple. Wildcards can be used. The path must eventually match to a file, if it only matches a directory the import will fail. + + - `NEGTIVE` + + This keyword is used to indicate that this import is a batch of "negative" imports. This method is only for aggregate data tables with integer SUM aggregate type. This method will reverse the integer value corresponding to the SUM aggregate column in the imported data. Mainly used to offset previously imported wrong data. + + - `PARTITION(p1, p2, ...)` + + You can specify to import only certain partitions of the table. Data that is no longer in the partition range will be ignored. + + - `COLUMNS TERMINATED BY` + + Specifies the column separator. Only valid in CSV format. Only single-byte delimiters can be specified. + + - `FORMAT AS` + + Specifies the file type, CSV, PARQUET and ORC formats are supported. Default is CSV. + + - `column list` + + Used to specify the column order in the original file. For a detailed introduction to this part, please refer to the [Column Mapping, Conversion and Filtering](..../../../data-operate/import/import-scenes/load-data-convert.html) document. + + `(k1, k2, tmpk1)` + + - `COLUMNS FROM PATH AS` + + Specifies the columns to extract from the import file path. + + - `PRECEDING FILTER predicate` + + Pre-filter conditions. The data is first concatenated into raw data rows in order according to `column list` and `COLUMNS FROM PATH AS`. Then filter according to the pre-filter conditions. For a detailed introduction to this part, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + + - `SET (column_mapping)` + + Specifies the conversion function for the column. + + - `WHERE predicate` + + Filter imported data based on conditions. For a detailed introduction to this part, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + + - `DELETE ON expr` + + It needs to be used with the MEREGE import mode, only for the table of the Unique Key model. Used to specify the columns and calculated relationships in the imported data that represent the Delete Flag. + + - `ORDER BY` + + Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing. + +- `WITH BROKER broker_name` + + Specify the Broker service name to be used. In the public cloud Doris. Broker service name is `bos` + +- `broker_properties` + + Specifies the information required by the broker. This information is usually used by the broker to be able to access remote storage systems. Such as BOS or HDFS. See the [Broker](../../../advanced/broker.html) documentation for specific information. + + ````text + ( + "key1" = "val1", + "key2" = "val2", + ... + ) + ```` + +- `load_properties` + + Specifies import-related parameters. The following parameters are currently supported: + + - `timeout` + + Import timeout. The default is 4 hours. in seconds. + + - `max_filter_ratio` + + The maximum tolerable proportion of data that can be filtered (for reasons such as data irregularity). Zero tolerance by default. The value range is 0 to 1. + + - `exec_mem_limit` + + Import memory limit. Default is 2GB. The unit is bytes. + + - `strict_mode` + + Whether to impose strict restrictions on data. Defaults to false. + + - `timezone` + + Specify the time zone for some functions that are affected by time zones, such as `strftime/alignment_timestamp/from_unixtime`, etc. Please refer to the [timezone](../../advanced/time-zone.html) documentation for details. If not specified, the "Asia/Shanghai" timezone is used + ### Example +1. Import a batch of data from HDFS + + ```sql + LOAD LABEL example_db.label1 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "," + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ```` + + Import the file `file.txt`, separated by commas, into the table `my_table`. + +2. Import data from HDFS, using wildcards to match two batches of files in two batches. into two tables separately. + + ```sql + LOAD LABEL example_db.label2 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*") + INTO TABLE `my_table1` + PARTITION (p1) + COLUMNS TERMINATED BY "," + (k1, tmp_k2, tmp_k3) + SET ( + k2 = tmp_k2 + 1, + k3 = tmp_k3 + 1 + ) + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*") + INTO TABLE `my_table2` + COLUMNS TERMINATED BY "," + (k1, k2, k3) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ```` + + Import two batches of files `file-10*` and `file-20*` using wildcard matching. Imported into two tables `my_table1` and `my_table2` respectively. Where `my_table1` specifies to import into partition `p1`, and will import the values of the second and third columns in the source file +1. + +3. Import a batch of data from HDFS. + + ```sql + LOAD LABEL example_db.label3 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "\\x01" + ) + WITH BROKER my_hdfs_broker + ( + "username" = "", + "password" = "", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ```` + + Specify the delimiter as Hive's default delimiter `\\x01`, and use the wildcard * to specify all files in all directories under the `data` directory. Use simple authentication while configuring namenode HA. + +4. Import data in Parquet format and specify FORMAT as parquet. The default is to judge by the file suffix + + ```sql + LOAD LABEL example_db.label4 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file") + INTO TABLE `my_table` + FORMAT AS "parquet" + (k1, k2, k3) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ```` + +5. Import the data and extract the partition field in the file path + + ```sql + LOAD LABEL example_db.label10 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") + INTO TABLE `my_table` + FORMAT AS "csv" + (k1, k2, k3) + COLUMNS FROM PATH AS (city, utc_date) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ```` + + The columns in the `my_table` table are `k1, k2, k3, city, utc_date`. + + The `hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing` directory includes the following files: + + ````text + hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv + ```` + + The file only contains three columns of `k1, k2, k3`, and the two columns of `city, utc_date` will be extracted from the file path. + +6. Filter the data to be imported. + + ```sql + LOAD LABEL example_db.label6 + ( + DATA INFILE("hdfs://host:port/input/file") + INTO TABLE `my_table` + (k1, k2, k3) + PRECEDING FILTER k1 = 1 + SET ( + k2 = k2 + 1 + ) + WHERE k1 > k2 + ) + WITH BROKER hdfs + ( + "username"="user", + "password"="pass" + ); + ```` + + Only in the original data, k1 = 1, and after transformation, rows with k1 > k2 will be imported. + +7. Import data, extract the time partition field in the file path, and the time contains %3A (in the hdfs path, ':' is not allowed, all ':' will be replaced by %3A) + + ```sql + LOAD LABEL example_db.label7 + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl12` + COLUMNS TERMINATED BY "," + (k2,k3) + COLUMNS FROM PATH AS (data_time) + SET ( + data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') + ) + ) + WITH BROKER hdfs + ( + "username"="user", + "password"="pass" + ); + ```` + + There are the following files in the path: + + ````text + /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt + /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt + ```` + + The table structure is: + + ````text + data_time DATETIME, + k2 INT, + k3 INT + ```` + +8. Import a batch of data from HDFS, specify the timeout and filter ratio. Broker with clear text my_hdfs_broker. Simple authentication. And delete the columns in the original data that match the columns with v2 greater than 100 in the imported data, and other columns are imported normally + + ```sql + LOAD LABEL example_db.label8 + ( + MERGE DATA INFILE("HDFS://test:802/input/file") + INTO TABLE `my_table` + (k1, k2, k3, v2, v1) + DELETE ON v2 > 100 + ) + WITH HDFS + ( + "username"="user", + "password"="pass" + ) + PROPERTIES + ( + "timeout" = "3600", + "max_filter_ratio" = "0.1" + ); + ```` + + Import using the MERGE method. `my_table` must be a table with Unique Key. When the value of the v2 column in the imported data is greater than 100, the row is considered a delete row. + + The import task timeout is 3600 seconds, and the error rate is allowed to be within 10%. + +9. Specify the source_sequence column when importing to ensure the replacement order in the UNIQUE_KEYS table: + + ```sql + LOAD LABEL example_db.label9 + ( + DATA INFILE("HDFS://test:802/input/file") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "," + (k1,k2,source_sequence,v1,v2) + ORDER BY source_sequence + ) + WITH HDFS + ( + "username"="user", + "password"="pass" + ) + ```` + + `my_table` must be an Unqiue Key model table with Sequence Col specified. The data will be ordered according to the value of the `source_sequence` column in the source data. + ### Keywords BROKER, LOAD ### Best Practice +1. Check the import task status + + Broker Load is an asynchronous import process. The successful execution of the statement only means that the import task is submitted successfully, and does not mean that the data import is successful. The import status needs to be viewed through the [SHOW LOAD](../../Show-Statements/SHOW-LOAD.html) command. + +2. Cancel the import task + + Import tasks that have been submitted but not yet completed can be canceled by the [CANCEL LOAD](./CANCEL-LOAD.html) command. After cancellation, the written data will also be rolled back and will not take effect. + +3. Label, import transaction, multi-table atomicity + + All import tasks in Doris are atomic. And the import of multiple tables in the same import task can also guarantee atomicity. At the same time, Doris can also use the Label mechanism to ensure that the data imported is not lost or heavy. For details, see the [Import Transactions and Atomicity](../../../data-operate/import/import-scenes/load-atomicity.html) documentation. + +4. Column mapping, derived columns and filtering + + Doris can support very rich column transformation and filtering operations in import statements. Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + +5. Error data filtering + + Doris' import tasks can tolerate a portion of malformed data. Tolerated via `max_filter_ratio` setting. The default is 0, which means that the entire import task will fail when there is an error data. If the user wants to ignore some problematic data rows, the secondary parameter can be set to a value between 0 and 1, and Doris will automatically skip the rows with incorrect data format. + + For some calculation methods of the tolerance rate, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + +6. Strict Mode + + The `strict_mode` attribute is used to set whether the import task runs in strict mode. The format affects the results of column mapping, transformation, and filtering. For a detailed description of strict mode, see the [strict mode](../../../data-operate/import/import-scenes/load-strict-mode.html) documentation. + +7. Timeout + + The default timeout for Broker Load is 4 hours. from the time the task is submitted. If it does not complete within the timeout period, the task fails. + +8. Limits on data volume and number of tasks + + Broker Load is suitable for importing data within 100GB in one import task. Although theoretically there is no upper limit on the amount of data imported in one import task. But committing an import that is too large results in a longer run time, and the cost of retrying after a failure increases. + + At the same time, limited by the size of the cluster, we limit the maximum amount of imported data to the number of ComputeNode nodes * 3GB. In order to ensure the rational use of system resources. If there is a large amount of data to be imported, it is recommended to divide it into multiple import tasks. + + Doris also limits the number of import tasks running simultaneously in the cluster, usually ranging from 3 to 10. Import jobs submitted after that are queued. The maximum queue length is 100. Subsequent submissions will be rejected outright. Note that the queue time is also calculated into the total job time. If it times out, the job is canceled. Therefore, it is recommended to reasonably control the frequency of job submission by monitoring the running status of the job. + diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md index dfa1b883eb..79484cf0b4 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md @@ -26,13 +26,44 @@ under the License. ## CANCEL-LOAD +### Name + +CANCEL LOAD + ### Description +This statement is used to undo an import job for the specified label. Or batch undo import jobs via fuzzy matching + +```sql +CANCEL LOAD +[FROM db_name] +WHERE [LABEL = "load_label" | LABEL like "label_pattern"]; +```` + ### Example +1. Cancel the import job whose label is `example_db_test_load_label` on the database example_db + + ```sql + CANCEL LOAD + FROM example_db + WHERE LABEL = "example_db_test_load_label"; + ```` + +2. Cancel all import jobs containing example* on the database example*db. + + ```sql + CANCEL LOAD + FROM example_db + WHERE LABEL like "example_"; + ```` + ### Keywords - CANCEL, LOAD + CANCEL, LOAD ### Best Practice +1. Only pending import jobs in PENDING, ETL, LOADING state can be canceled. +2. When performing batch undo, Doris does not guarantee the atomic undo of all corresponding import jobs. That is, it is possible that only some of the import jobs were successfully undone. The user can view the job status through the SHOW LOAD statement and try to execute the CANCEL LOAD statement repeatedly. + diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index c4f1f52119..92e30dd3a9 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -26,13 +26,498 @@ under the License. ## CREATE-ROUTINE-LOAD +### Name + +CREATE ROUTINE LOAD + ### Description +The Routine Load function allows users to submit a resident import task, and import data into Doris by continuously reading data from a specified data source. + +Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication. + +grammar: + +```sql +CREATE ROUTINE LOAD [db.]job_name ON tbl_name +[merge_type] +[load_properties] +[job_properties] +FROM data_source [data_source_properties] +```` + +- `[db.]job_name` + + The name of the import job. Within the same database, only one job with the same name can be running. + +- `tbl_name` + + Specifies the name of the table to be imported. + +- `merge_type` + + Data merge type. The default is APPEND, which means that the imported data are ordinary append write operations. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all imported data are deleted data. + +- load_properties + + Used to describe imported data. The composition is as follows: + + ````SQL + [column_separator], + [columns_mapping], + [preceding_filter], + [where_predicates], + [partitions], + [DELETE ON], + [ORDER BY] + ```` + + - `column_separator` + + Specifies the column separator, defaults to `\t` + + `COLUMNS TERMINATED BY ","` + + - `columns_mapping` + + It is used to specify the mapping relationship between file columns and columns in the table, as well as various column transformations. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document. + + `(k1, k2, tmpk1, k3 = tmpk1 + 1)` + + - `preceding_filter` + + Filter raw data. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document. + + - `where_predicates` + + Filter imported data based on conditions. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document. + + `WHERE k1 > 100 and k2 = 1000` + + - `partitions` + + Specify in which partitions of the import destination table. If not specified, it will be automatically imported into the corresponding partition. + + `PARTITION(p1, p2, p3)` + + - `DELETE ON` + + It needs to be used with the MEREGE import mode, only for the table of the Unique Key model. Used to specify the columns and calculated relationships in the imported data that represent the Delete Flag. + + `DELETE ON v3 >100` + + - `ORDER BY` + + Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing. + +- `job_properties` + + Common parameters for specifying routine import jobs. + + ````text + PROPERTIES ( + "key1" = "val1", + "key2" = "val2" + ) + ```` + + Currently we support the following parameters: + + 1. `desired_concurrent_number` + + Desired concurrency. A routine import job will be divided into multiple subtasks for execution. This parameter specifies the maximum number of tasks a job can execute concurrently. Must be greater than 0. Default is 3. + + This degree of concurrency is not the actual degree of concurrency. The actual degree of concurrency will be comprehensively considered by the number of nodes in the cluster, the load situation, and the situation of the data source. + + `"desired_concurrent_number" = "3"` + + 2. `max_batch_interval/max_batch_rows/max_batch_size` + + These three parameters represent: + + 1. The maximum execution time of each subtask, in seconds. The range is 5 to 60. Default is 10. + 2. The maximum number of lines read by each subtask. Must be greater than or equal to 200000. The default is 200000. + 3. The maximum number of bytes read by each subtask. The unit is bytes and the range is 100MB to 1GB. The default is 100MB. + + These three parameters are used to control the execution time and processing volume of a subtask. When either one reaches the threshold, the task ends. + + ````text + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ```` + + 3. `max_error_number` + + The maximum number of error lines allowed within the sampling window. Must be greater than or equal to 0. The default is 0, which means no error lines are allowed. + + The sampling window is `max_batch_rows * 10`. That is, if the number of error lines is greater than `max_error_number` within the sampling window, the routine operation will be suspended, requiring manual intervention to check data quality problems. + + Rows that are filtered out by where conditions are not considered error rows. + + 4. `strict_mode` + + Whether to enable strict mode, the default is off. If enabled, the column type conversion of non-null raw data will be filtered if the result is NULL. Specify as: + + `"strict_mode" = "true"` + + 5. `timezone` + + Specifies the time zone used by the import job. The default is to use the Session's timezone parameter. This parameter affects the results of all time zone-related functions involved in the import. + + 6. `format` + + Specify the import data format, the default is csv, and the json format is supported. + + 7. `jsonpaths` + + When the imported data format is json, the fields in the Json data can be extracted by specifying jsonpaths. + + `-H "jsonpaths: [\"$.k2\", \"$.k1\"]"` + + 8. `strip_outer_array` + + When the imported data format is json, strip_outer_array is true, indicating that the Json data is displayed in the form of an array, and each element in the data will be regarded as a row of data. The default value is false. + + `-H "strip_outer_array: true"` + + 9. `json_root` + + When the import data format is json, you can specify the root node of the Json data through json_root. Doris will extract the elements of the root node through json_root for parsing. Default is empty. + + `-H "json_root: $.RECORDS"` + +- `FROM data_source [data_source_properties]` + + The type of data source. Currently supports: + + ````text + FROM KAFKA + ( + "key1" = "val1", + "key2" = "val2" + ) + ```` + + `data_source_properties` supports the following data source properties: + + 1. `kafka_broker_list` + + Kafka's broker connection information. The format is ip:host. Separate multiple brokers with commas. + + `"kafka_broker_list" = "broker1:9092,broker2:9092"` + + 2. `kafka_topic` + + Specifies the Kafka topic to subscribe to. + + `"kafka_topic" = "my_topic"` + + 3. `kafka_partitions/kafka_offsets` + + Specify the kafka partition to be subscribed to, and the corresponding starting offset of each partition. If a time is specified, consumption will start at the nearest offset greater than or equal to the time. + + offset can specify a specific offset from 0 or greater, or: + + - `OFFSET_BEGINNING`: Start subscription from where there is data. + - `OFFSET_END`: subscribe from the end. + - Time format, such as: "2021-05-22 11:00:00" + + If not specified, all partitions under topic will be subscribed from `OFFSET_END` by default. + + ````text + "kafka_partitions" = "0,1,2,3", + "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" + ```` + + ````text + "kafka_partitions" = "0,1,2,3", + "kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00" + ```` + + Note that the time format cannot be mixed with the OFFSET format. + + 4. `property` + + Specify custom kafka parameters. The function is equivalent to the "--property" parameter in the kafka shell. + + When the value of the parameter is a file, you need to add the keyword: "FILE:" before the value. + + For how to create a file, please refer to the [CREATE FILE](http://palo.baidu.com/docs/SQL Manual/Syntax Help/DML/ROUTINE-LOAD/#Syntax error or this link does not work-) command documentation. + + For more supported custom parameters, please refer to the configuration items on the client side in the official CONFIGURATION document of librdkafka. Such as: + + ````text + "property.client.id" = "12345", + "property.ssl.ca.location" = "FILE:ca.pem" + ```` + + 1. When connecting to Kafka using SSL, you need to specify the following parameters: + + ````text + "property.security.protocol" = "ssl", + "property.ssl.ca.location" = "FILE:ca.pem", + "property.ssl.certificate.location" = "FILE:client.pem", + "property.ssl.key.location" = "FILE:client.key", + "property.ssl.key.password" = "abcdefg" + ```` + + in: + + `property.security.protocol` and `property.ssl.ca.location` are required to indicate the connection method is SSL and the location of the CA certificate. + + If client authentication is enabled on the Kafka server side, thenAlso set: + + ````text + "property.ssl.certificate.location" + "property.ssl.key.location" + "property.ssl.key.password" + ```` + + They are used to specify the client's public key, private key, and password for the private key, respectively. + + 2. Specify the default starting offset of the kafka partition + + If `kafka_partitions/kafka_offsets` is not specified, all partitions are consumed by default. + + At this point, you can specify `kafka_default_offsets` to specify the starting offset. Defaults to `OFFSET_END`, i.e. subscribes from the end. + + Example: + + ````text + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ```` + ### Example +1. Create a Kafka routine import task named test1 for example_tbl of example_db. Specify the column separator and group.id and client.id, and automatically consume all partitions by default, and start subscribing from the location where there is data (OFFSET_BEGINNING) + + + + ```sql + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS TERMINATED BY ",", + COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "property.group.id" = "xxx", + "property.client.id" = "xxx", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + ```` + +2. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode. + + + + ```sql + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), + PRECEDING FILTER k1 = 1, + WHERE k1 > 100 and k2 like "%doris%" + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2,3", + "kafka_offsets" = "101,0,0,200" + ); + ```` + +3. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan + + + + ```sql + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), + WHERE k1 > 100 and k2 like "%doris%" + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false", + "timezone" = "Africa/Abidjan" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "property.security.protocol" = "ssl", + "property.ssl.ca.location" = "FILE:ca.pem", + "property.ssl.certificate.location" = "FILE:client.pem", + "property.ssl.key.location" = "FILE:client.key", + "property.ssl.key.password" = "abcdefg", + "property.client.id" = "my_client_id" + ); + ```` + +4. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0 + + + + ```sql + CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 + COLUMNS(category,price,author) + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2", + "kafka_offsets" = "0,0,0" + ); + ```` + +5. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document + + + + ```sql + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false", + "format" = "json", + "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", + "json_root" = "$.RECORDS" + "strip_outer_array" = "true" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2", + "kafka_offsets" = "0,0,0" + ); + ```` + +6. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering. + + + + ```sql + CREATE ROUTINE LOAD example_db.test1 ON example_tbl + WITH MERGE + COLUMNS(k1, k2, k3, v1, v2, v3), + WHERE k1 > 100 and k2 like "%doris%", + DELETE ON v3 >100 + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200", + "strict_mode" = "false" + ) + FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2,3", + "kafka_offsets" = "101,0,0,200" + ); + ```` + +7. Import data to Unique with sequence column Key model table + + + + ```sql + CREATE ROUTINE LOAD example_db.test_job ON example_tbl + COLUMNS TERMINATED BY ",", + COLUMNS(k1,k2,source_sequence,v1,v2), + ORDER BY source_sequence + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "30", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", + "kafka_topic" = "my_topic", + "kafka_partitions" = "0,1,2,3", + "kafka_offsets" = "101,0,0,200" + ); + ```` + +8. Consume from a specified point in time + + + + ```sql + CREATE ROUTINE LOAD example_db.test_job ON example_tbl + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "30", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) FROM KAFKA + ( + "kafka_broker_list" = "broker1:9092,broker2:9092", + "kafka_topic" = "my_topic", + "kafka_default_offset" = "2021-05-21 10:00:00" + ); + ```` + ### Keywords CREATE, ROUTINE, LOAD ### Best Practice +Partition and Offset for specified consumption + +Doris supports the specified Partition and Offset to start consumption, and also supports the function of consumption at a specified time point. The configuration relationship of the corresponding parameters is described here. + +There are three relevant parameters: + +- `kafka_partitions`: Specify a list of partitions to be consumed, such as "0, 1, 2, 3". +- `kafka_offsets`: Specify the starting offset of each partition, which must correspond to the number of `kafka_partitions` list. For example: "1000, 1000, 2000, 2000" +- `property.kafka_default_offset`: Specifies the default starting offset of the partition. + +When creating an import job, these three parameters can have the following combinations: + +| Composition | `kafka_partitions` | `kafka_offsets` | `property.kafka_default_offset` | Behavior | +| ----------- | ------------------ | --------------- | ------------------------------- | ------------------------------------------------------------ | +| 1 | No | No | No | The system will automatically find all partitions corresponding to the topic and start consumption from OFFSET_END | +| 2 | No | No | Yes | The system will automatically find all partitions corresponding to the topic and start consumption from the location specified by default offset | +| 3 | Yes | No | No | The system will start consumption from OFFSET_END of the specified partition | +| 4 | Yes | Yes | No | The system will start consumption from the specified offset of the specified partition | +| 5 | Yes | No | Yes | The system will start consumption from the specified partition, the location specified by default offset | diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-SYNC-JOB.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-SYNC-JOB.md index 3de21e8519..d93723e6f6 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-SYNC-JOB.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-SYNC-JOB.md @@ -26,10 +26,145 @@ under the License. ## CREATE-SYNC-JOB +### Name + +CREATE SYNC JOB + ### Description +The data synchronization (Sync Job) function supports users to submit a resident data synchronization job, and incrementally synchronizes the CDC (Change Data Capture) of the user's data update operation in the Mysql database by reading the Binlog log from the specified remote address. Features. + +Currently, the data synchronization job only supports connecting to Canal, obtaining the parsed Binlog data from the Canal Server and importing it into Doris. + +Users can view the data synchronization job status through [SHOW SYNC JOB](../../../sql-manual/sql-reference-v2/Show-Statements/SHOW-SYNC-JOB.html). + +grammar: + +```sql +CREATE SYNC [db.]job_name + ( + channel_desc, + channel_desc + ... + ) +binlog_desc +```` + +1. `job_name` + + The synchronization job name is the unique identifier of the job in the current database. Only one job with the same `job_name` can be running. + +2. `channel_desc` + + The data channel under the job is used to describe the mapping relationship between the mysql source table and the doris target table. + + grammar: + + ```sql + FROM mysql_db.src_tbl INTO des_tbl + [partitions] + [columns_mapping] + ```` + + 1. `mysql_db.src_tbl` + + Specify the database and source table on the mysql side. + + 2. `des_tbl` + + Specify the target table on the doris side. Only unique tables are supported, and the batch delete function of the table needs to be enabled (see the 'batch delete function' of help alter table for how to enable it). + + 3. `partitions` + + Specify in which partitions of the import destination table. If not specified, it will be automatically imported into the corresponding partition. + + Example: + + ```` + PARTITION(p1, p2, p3) + ```` + + 4. `column_mapping` + + Specifies the mapping relationship between the columns of the mysql source table and the doris target table. If not specified, FE will default the columns of the source table and the target table to one-to-one correspondence in order. + + The form col_name = expr is not supported for columns. + + Example: + + ```` + Suppose the target table column is (k1, k2, v1), + + Change the order of columns k1 and k2 + COLUMNS(k2, k1, v1) + + Ignore the fourth column of the source data + COLUMNS(k2, k1, v1, dummy_column) + ```` + +3. `binlog_desc` + + Used to describe the remote data source, currently only one canal is supported. + + grammar: + + ```sql + FROM BINLOG + ( + "key1" = "value1", + "key2" = "value2" + ) + ```` + + 1. The properties corresponding to the Canal data source, prefixed with `canal.` + + 1. canal.server.ip: address of canal server + 2. canal.server.port: the port of the canal server + 3. canal.destination: the identity of the instance + 4. canal.batchSize: The maximum batch size obtained, the default is 8192 + 5. canal.username: username of instance + 6. canal.password: the password of the instance + 7. canal.debug: optional, when set to true, the batch and details of each row of data will be printed out + ### Example +1. Simply create a data synchronization job named `job1` for `test_tbl` of `test_db`, connect to the local Canal server, corresponding to the Mysql source table `mysql_db1.tbl1`. + + ````SQL + CREATE SYNC `test_db`.`job1` + ( + FROM `mysql_db1`.`tbl1` INTO `test_tbl` + ) + FROM BINLOG + ( + "type" = "canal", + "canal.server.ip" = "127.0.0.1", + "canal.server.port" = "11111", + "canal.destination" = "example", + "canal.username" = "", + "canal.password" = "" + ); + ```` + +2. Create a data synchronization job named `job1` for multiple tables of `test_db`, corresponding to multiple Mysql source tables one-to-one, and explicitly specify the column mapping. + + ````SQL + CREATE SYNC `test_db`.`job1` + ( + FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2), + FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1 + ) + FROM BINLOG + ( + "type" = "canal", + "canal.server.ip" = "xx.xxx.xxx.xx", + "canal.server.port" = "12111", + "canal.destination" = "example", + "canal.username" = "username", + "canal.password" = "password" + ); + ```` + ### Keywords CREATE, SYNC, JOB diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-ROUTINE-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-ROUTINE-LOAD.md index 63055f687c..a3e57e333d 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-ROUTINE-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-ROUTINE-LOAD.md @@ -26,10 +26,32 @@ under the License. ## PAUSE-ROUTINE-LOAD +### Name + +PAUSE ROUTINE LOAD + ### Description +Used to pause a Routine Load job. A suspended job can be rerun with the RESUME command. + +```sql +PAUSE [ALL] ROUTINE LOAD FOR job_name +```` + ### Example +1. Pause the routine import job named test1. + + ```sql + PAUSE ROUTINE LOAD FOR test1; + ```` + +2. Pause all routine import jobs. + + ```sql + PAUSE ALL ROUTINE LOAD; + ```` + ### Keywords PAUSE, ROUTINE, LOAD diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-SYNC-JOB.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-SYNC-JOB.md index 89157de16f..0af27181f4 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-SYNC-JOB.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/PAUSE-SYNC-JOB.md @@ -26,10 +26,28 @@ under the License. ## PAUSE-SYNC-JOB +### Name + +PAUSE SYNC JOB + ### Description +Pause a running resident data synchronization job in a database via `job_name`. The suspended job will stop synchronizing data and keep the latest position of consumption until it is resumed by the user. + +grammar: + +```sql +PAUSE SYNC JOB [db.]job_name +```` + ### Example +1. Pause the data sync job named `job_name`. + + ```sql + PAUSE SYNC JOB `job_name`; + ```` + ### Keywords PAUSE, SYNC, JOB diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-ROUTINE-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-ROUTINE-LOAD.md index ee49c95db8..4b05d37ccf 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-ROUTINE-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-ROUTINE-LOAD.md @@ -26,10 +26,32 @@ under the License. ## RESUME-ROUTINE-LOAD +### Name + +RESUME ROUTINE LOAD + ### Description +Used to restart a suspended Routine Load job. The restarted job will continue to consume from the previously consumed offset. + +```sql +RESUME [ALL] ROUTINE LOAD FOR job_name +```` + ### Example +1. Restart the routine import job named test1. + + ```sql + RESUME ROUTINE LOAD FOR test1; + ```` + +2. Restart all routine import jobs. + + ```sql + RESUME ALL ROUTINE LOAD; + ```` + ### Keywords RESUME, ROUTINE, LOAD diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-SYNC-JOB.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-SYNC-JOB.md index 792aada047..4da91e225e 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-SYNC-JOB.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/RESUME-SYNC-JOB.md @@ -26,10 +26,28 @@ under the License. ## RESUME-SYNC-JOB +### Name + +RESUME SYNC JOB + ### Description +Resume a resident data synchronization job whose current database has been suspended by `job_name`, and the job will continue to synchronize data from the latest position before the last suspension. + +grammar: + +```sql +RESUME SYNC JOB [db.]job_name +```` + ### Example +1. Resume the data synchronization job named `job_name` + + ```sql + RESUME SYNC JOB `job_name`; + ```` + ### Keywords RESUME, SYNC, LOAD diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-ROUTINE-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-ROUTINE-LOAD.md index e8cd90cdec..375b994db9 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-ROUTINE-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-ROUTINE-LOAD.md @@ -26,10 +26,26 @@ under the License. ## STOP-ROUTINE-LOAD +### Name + +STOP ROUTINE LOAD + ### Description +User stops a Routine Load job. A stopped job cannot be rerun. + +```sql +STOP ROUTINE LOAD FOR job_name; +```` + ### Example +1. Stop the routine import job named test1. + + ```sql + STOP ROUTINE LOAD FOR test1; + ```` + ### Keywords STOP, ROUTINE, LOAD diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-SYNC-JOB.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-SYNC-JOB.md index f9002b6880..cd2d125ad3 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-SYNC-JOB.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STOP-SYNC-JOB.md @@ -26,10 +26,28 @@ under the License. ## STOP-SYNC-JOB +### Name + +STOP SYNC JOB + ### Description +Stop a non-stop resident data synchronization job in a database by `job_name`. + +grammar: + +```sql +STOP SYNC JOB [db.]job_name +```` + ### Example +1. Stop the data sync job named `job_name` + + ```sql + STOP SYNC JOB `job_name`; + ```` + ### Keywords STOP, SYNC, JOB diff --git a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 6e7e24522b..8e3d85bbeb 100644 --- a/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/new-docs/en/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -26,13 +26,419 @@ under the License. ## STREAM-LOAD +### Name + +STREAM LOAD + ### Description +stream-load: load data to table in streaming + +```` +curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load +```` + +This statement is used to import data into the specified table. The difference from ordinary Load is that this import method is synchronous import. + + This import method can still ensure the atomicity of a batch of import tasks, either all data is imported successfully or all of them fail. + + This operation will update the data of the rollup table related to this base table at the same time. + + This is a synchronous operation. After the entire data import work is completed, the import result is returned to the user. + + Currently, HTTP chunked and non-chunked uploads are supported. For non-chunked methods, Content-Length must be used to indicate the length of the uploaded content, which can ensure the integrity of the data. + +In addition, it is best for users to set the content of the Expect Header field to 100-continue, which can avoid unnecessary data transmission in some error scenarios. + +Parameter introduction: + Users can pass in import parameters through the Header part of HTTP + +1. label: The label imported once, the data of the same label cannot be imported multiple times. Users can avoid the problem of duplicate data import by specifying Label. + + Currently, Doris retains the most recent successful label within 30 minutes. + +2. column_separator: used to specify the column separator in the import file, the default is \t. If it is an invisible character, you need to add \x as a prefix and use hexadecimal to represent the separator. + + For example, the separator \x01 of the hive file needs to be specified as -H "column_separator:\x01". + + You can use a combination of multiple characters as column separators. + +3. line_delimiter: used to specify the newline character in the imported file, the default is \n. Combinations of multiple characters can be used as newlines. + +4. columns: used to specify the correspondence between the columns in the import file and the columns in the table. If the column in the source file corresponds exactly to the content in the table, then there is no need to specify the content of this field. + + If the source file does not correspond to the table schema, then this field is required for some data conversion. There are two forms of column, one is directly corresponding to the field in the imported file, which is directly represented by the field name; + + One is derived column, the syntax is `column_name` = expression. Give a few examples to help understand. + + Example 1: There are 3 columns "c1, c2, c3" in the table, and the three columns in the source file correspond to "c3, c2, c1" at a time; then you need to specify -H "columns: c3, c2, c1 " + + Example 2: There are 3 columns "c1, c2, c3" in the table, the first three columns in the source file correspond in turn, but there is more than 1 column; then you need to specify -H "columns: c1, c2, c3, xxx"; + + The last column can be arbitrarily assigned a name and placeholder + + Example 3: There are three columns "year, month, day" in the table, and there is only one time column in the source file, which is in "2018-06-01 01:02:03" format; + + Then you can specify -H "columns: col, year = year(col), month=month(col), day=day(col)" to complete the import + +5. where: used to extract part of the data. If the user needs to filter out the unnecessary data, he can achieve this by setting this option. + + Example 1: Only import data greater than k1 column equal to 20180601, then you can specify -H "where: k1 = 20180601" when importing + +6. max_filter_ratio: The maximum tolerable data ratio that can be filtered (for reasons such as data irregularity). Zero tolerance by default. Data irregularities do not include rows filtered out by where conditions. + +7. partitions: used to specify the partition designed for this import. If the user can determine the partition corresponding to the data, it is recommended to specify this item. Data that does not satisfy these partitions will be filtered out. + + For example, specify import to p1, p2 partition, -H "partitions: p1, p2" + +8. timeout: Specify the import timeout. in seconds. The default is 600 seconds. The setting range is from 1 second to 259200 seconds. + +9. strict_mode: The user specifies whether to enable strict mode for this import. The default is off. The enable mode is -H "strict_mode: true". + +10. timezone: Specify the time zone used for this import. The default is Dongba District. This parameter affects the results of all time zone-related functions involved in the import. + +11. exec_mem_limit: Import memory limit. Default is 2GB. The unit is bytes. + +12. format: Specify the import data format, the default is csv, and json format is supported. + +13. jsonpaths: The way of importing json is divided into: simple mode and matching mode. + + Simple mode: The simple mode is not set the jsonpaths parameter. In this mode, the json data is required to be an object type, for example: + + ```` + {"k1":1, "k2":2, "k3":"hello"}, where k1, k2, k3 are column names. + ```` + + Matching mode: It is relatively complex for json data and needs to match the corresponding value through the jsonpaths parameter. + +14. strip_outer_array: Boolean type, true indicates that the json data starts with an array object and flattens the array object, the default value is false. E.g: + + ```` + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + When strip_outer_array is true, the final import into doris will generate two rows of data. + ```` + +15. json_root: json_root is a valid jsonpath string, used to specify the root node of the json document, the default value is "". + +16. merge_type: The merge type of data, which supports three types: APPEND, DELETE, and MERGE. Among them, APPEND is the default value, which means that this batch of data needs to be appended to the existing data, and DELETE means to delete all the data with the same key as this batch of data. Line, the MERGE semantics need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to the DELETE semantics and the rest is processed according to the APPEND semantics, for example: `-H "merge_type: MERGE" -H "delete: flag=1"` + +17. delete: Only meaningful under MERGE, indicating the deletion condition of the data + function_column.sequence_col: Only applicable to UNIQUE_KEYS. Under the same key column, ensure that the value column is REPLACEed according to the source_sequence column. The source_sequence can be a column in the data source or a column in the table structure. + +18. fuzzy_parse: Boolean type, true means that json will be parsed with the schema of the first row. Enabling this option can improve the efficiency of json import, but requires that the order of the keys of all json objects is the same as the first row, the default is false, only use in json format + +19. num_as_string: Boolean type, true means that when parsing json data, the numeric type will be converted to a string, and then imported without losing precision. + +20. read_json_by_line: Boolean type, true to support reading one json object per line, the default value is false. + +21. send_batch_parallelism: Integer, used to set the parallelism of sending batch data. If the value of parallelism exceeds `max_send_batch_parallelism_per_job` in the BE configuration, the BE as a coordination point will use the value of `max_send_batch_parallelism_per_job`. + + RETURN VALUES + After the import is complete, the related content of this import will be returned in Json format. Currently includes the following fields + Status: Import the last status. + Success: Indicates that the import is successful and the data is already visible; + Publish Timeout: Indicates that the import job has been successfully committed, but is not immediately visible for some reason. The user can consider the import to be successful and not have to retry the import + Label Already Exists: Indicates that the Label has been occupied by other jobs. It may be imported successfully or it may be being imported. + The user needs to determine the subsequent operation through the get label state command + Others: The import failed, the user can specify the Label to retry the job + Message: Detailed description of the import status. On failure, the specific failure reason is returned. + NumberTotalRows: The total number of rows read from the data stream + NumberLoadedRows: The number of data rows imported this time, only valid in Success + NumberFilteredRows: The number of rows filtered out by this import, that is, the number of rows with unqualified data quality + NumberUnselectedRows: This import, the number of rows filtered out by the where condition + LoadBytes: The size of the source file data imported this time + LoadTimeMs: The time taken for this import + BeginTxnTimeMs: The time it takes to request Fe to start a transaction, in milliseconds. + StreamLoadPutTimeMs: The time it takes to request Fe to obtain the execution plan for importing data, in milliseconds. + ReadDataTimeMs: Time spent reading data, in milliseconds. + WriteDataTimeMs: The time taken to perform the write data operation, in milliseconds. + CommitAndPublishTimeMs: The time it takes to submit a request to Fe and publish the transaction, in milliseconds. + ErrorURL: The specific content of the filtered data, only the first 1000 items are retained + +ERRORS: + Import error details can be viewed with the following statement: + + ```sql + SHOW LOAD WARNINGS ON 'url + ```` + + + where url is the url given by ErrorURL. + ### Example +1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds + + ```` + curl --location-trusted -u root -H "label:123" -H "timeout:100" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +2. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', use Label for deduplication, and only import data whose k1 is equal to 20180601 + + + ```` + curl --location-trusted -u root -H "label:123" -H "where: k1=20180601" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +3. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', allowing a 20% error rate (the user is in the defalut_cluster) + + + ```` + curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +4. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', allow a 20% error rate, and specify the column name of the file (the user is in the defalut_cluster) + + + ```` + curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/testDb/testTbl /_stream_load + ```` + +5. Import the data in the local file 'testData' into the p1, p2 partitions of the table 'testTbl' in the database 'testDb', allowing a 20% error rate. + + + ```` + curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +6. Import using streaming (user is in defalut_cluster) + + + ```` + seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T - http://host:port/api/testDb/testTbl/ _stream_load + ```` + +7. Import a table containing HLL columns, which can be columns in the table or columns in the data to generate HLL columns, or use hll_empty to supplement columns that are not in the data + + + ```` + curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +8. Import data for strict mode filtering and set the time zone to Africa/Abidjan + + + ```` + curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +9. Import a table with a BITMAP column, which can be a column in the table or a column in the data to generate a BITMAP column, or use bitmap_empty to fill an empty Bitmap + ```` + curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load + ```` + +10. Simple mode, import json data + Table Structure: + +`category` varchar(512) NULL COMMENT "", +`author` varchar(512) NULL COMMENT "", +`title` varchar(512) NULL COMMENT "", +`price` double NULL COMMENT "" + +json data format: + +```` +{"category":"C++","author":"avc","title":"C++ primer","price":895} +```` + +Import command: + +```` +curl --location-trusted -u root -H "label:123" -H "format: json" -T testData http://host:port/api/testDb/testTbl/_stream_load +```` + +In order to improve throughput, it supports importing multiple pieces of json data at one time, each line is a json object, and \n is used as a newline by default. You need to set read_json_by_line to true. The json data format is as follows: + + +```` +{"category":"C++","author":"avc","title":"C++ primer","price":89.5} +{"category":"Java","author":"avc","title":"Effective Java","price":95} +{"category":"Linux","author":"avc","title":"Linux kernel","price":195} +```` + +11. Match pattern, import json data + json data format: + +```` +[ +{"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},{"category":"xuxb222","author":"2avc"," title":"SayingsoftheCentury","price":895}, +{"category":"xuxb333","author":"3avc","title":"SayingsoftheCentury","price":895} +] +```` + +Precise import by specifying jsonpath, such as importing only three attributes of category, author, and price + +```` +curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\" $.price\",\"$.author\"]" -H "strip_outer_array: true" -T testData http://host:port/api/testDb/testTbl/_stream_load +```` + +illustrate: + 1) If the json data starts with an array, and each object in the array is a record, you need to set strip_outer_array to true, which means flatten the array. + 2) If the json data starts with an array, and each object in the array is a record, when setting jsonpath, our ROOT node is actually an object in the array. + +12. User specified json root node + json data format: + +```` +{ + "RECORDS":[ +{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, +{"category":"22","author":"2avc","price":895,"timestamp":1589191487}, +{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} +] +} +```` + +Precise import by specifying jsonpath, such as importing only three attributes of category, author, and price + +```` +curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\" $.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load +```` + +13. Delete the data with the same import key as this batch + +```` +curl --location-trusted -u root -H "merge_type: DELETE" -T testData http://host:port/api/testDb/testTbl/_stream_load +```` + +14. Delete the columns in this batch of data that match the data whose flag is listed as true, and append other rows normally + +```` +curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/testDb/testTbl/_stream_load +```` + +15. Import data into UNIQUE_KEYS table with sequence column + +```` +curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/ _stream_load +```` + ### Keywords STREAM, LOAD ### Best Practice +1. Check the import task status + + Stream Load is a synchronous import process. The successful execution of the statement means that the data is imported successfully. The imported execution result will be returned synchronously through the HTTP return value. And display it in Json format. An example is as follows: + + ````json + { + "TxnId": 17, + "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5", + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 5, + "NumberLoadedRows": 5, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 28, + "LoadTimeMs": 27, + "BeginTxnTimeMs": 0, + "StreamLoadPutTimeMs": 2, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 3, + "CommitAndPublishTimeMs": 18 + } + ```` + + The field definitions are as follows: + + - TxnId: Import transaction ID, which is automatically generated by the system and is globally unique. + + - Label: Import Label, if not specified, the system will generate a UUID. + + - Status: + + Import results. Has the following values: + + - Success: Indicates that the import was successful and the data is already visible. + - Publish Timeout: This status also means that the import has completed, but the data may be visible with a delay. + - Label Already Exists: The Label is duplicated and needs to be replaced. + - Fail: Import failed. + + - ExistingJobStatus: + + The status of the import job corresponding to the existing Label. + + This field is only displayed when the Status is "Label Already Exists". The user can know the status of the import job corresponding to the existing Label through this status. "RUNNING" means the job is still executing, "FINISHED" means the job was successful. + + - Message: Import error message. + + - NumberTotalRows: The total number of rows processed by the import. + + - NumberLoadedRows: The number of rows successfully imported. + + - NumberFilteredRows: The number of rows with unqualified data quality. + + - NumberUnselectedRows: The number of rows filtered by the where condition. + + - LoadBytes: Number of bytes imported. + + - LoadTimeMs: Import completion time. The unit is milliseconds. + + - BeginTxnTimeMs: The time it takes to request the FE to start a transaction, in milliseconds. + + - StreamLoadPutTimeMs: The time taken to request the FE to obtain the execution plan for importing data, in milliseconds. + + - ReadDataTimeMs: Time spent reading data, in milliseconds. + + - WriteDataTimeMs: The time spent performing the write data operation, in milliseconds. + + - CommitAndPublishTimeMs: The time it takes to submit a request to Fe and publish the transaction, in milliseconds. + + - ErrorURL: If there is a data quality problem, visit this URL to view the specific error line. + +2. How to correctly submit the Stream Load job and process the returned results. + + Stream Load is a synchronous import operation, so the user needs to wait for the return result of the command synchronously, and decide the next processing method according to the return result. + + The user's primary concern is the `Status` field in the returned result. + + If it is `Success`, everything is fine and you can do other operations after that. + + If the returned result shows a large number of `Publish Timeout`, it may indicate that some resources (such as IO) of the cluster are currently under strain, and the imported data cannot take effect finally. The import task in the state of `Publish Timeout` has succeeded and does not need to be retried. However, it is recommended to slow down or stop the submission of new import tasks and observe the cluster load. + + If the returned result is `Fail`, the import failed, and you need to check the problem according to the specific reason. Once resolved, you can retry with the same Label. + + In some cases, the user's HTTP connection may be disconnected abnormally and the final returned result cannot be obtained. At this point, you can use the same Label to resubmit the import task, and the resubmitted task may have the following results: + + 1. `Status` status is `Success`, `Fail` or `Publish Timeout`. At this point, it can be processed according to the normal process. + 2. The `Status` status is `Label Already Exists`. At this time, you need to continue to view the `ExistingJobStatus` field. If the value of this field is `FINISHED`, it means that the import task corresponding to this Label has been successful, and there is no need to retry. If it is `RUNNING`, it means that the import task corresponding to this Label is still running. At this time, you need to use the same Label to continue to submit repeatedly at intervals (such as 10 seconds) until `Status` is not `Label Already Exists' `, or until the value of the `ExistingJobStatus` field is `FINISHED`. + +3. Cancel the import task + + Import tasks that have been submitted and not yet completed can be canceled with the CANCEL LOAD command. After cancellation, the written data will also be rolled back and will not take effect. + +4. Label, import transaction, multi-table atomicity + + All import tasks in Doris are atomic. And the import of multiple tables in the same import task can also guarantee atomicity. At the same time, Doris can also use the Label mechanism to ensure that the data imported is not lost or heavy. For details, see the [Import Transactions and Atomicity](../../../data-operate/import/import-scenes/load-atomicity.html) documentation. + +5. Column mapping, derived columns and filtering + + Doris can support very rich column transformation and filtering operations in import statements. Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + +6. Error data filtering + + Doris' import tasks can tolerate a portion of malformed data. The tolerance ratio is set via `max_filter_ratio`. The default is 0, which means that the entire import task will fail when there is an error data. If the user wants to ignore some problematic data rows, the secondary parameter can be set to a value between 0 and 1, and Doris will automatically skip the rows with incorrect data format. + + For some calculation methods of the tolerance rate, please refer to the [Column Mapping, Conversion and Filtering](../../../data-operate/import/import-scenes/load-data-convert.html) document. + +7. Strict Mode + + The `strict_mode` attribute is used to set whether the import task runs in strict mode. The format affects the results of column mapping, transformation, and filtering. For a detailed description of strict mode, see the [strict mode](../../../data-operate/import/import-scenes/load-strict-mode.html) documentation. + +8. Timeout + + The default timeout for Stream Load is 10 minutes. from the time the task is submitted. If it does not complete within the timeout period, the task fails. + +9. Limits on data volume and number of tasks + + Stream Load is suitable for importing data within a few GB. Because the data is processed by single-threaded transmission, the performance of importing excessively large data cannot be guaranteed. When a large amount of local data needs to be imported, multiple import tasks can be submitted in parallel. + + Doris also limits the number of import tasks running at the same time in the cluster, usually ranging from 10-20. Import jobs submitted after that will be rejected. + diff --git a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md index 11c410f4cb..fbf1e57969 100644 --- a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md +++ b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md @@ -26,10 +26,98 @@ under the License. ## ALTER-ROUTINE-LOAD +### Name + +ALTER ROUTINE LOAD + ### Description +该语法用于修改已经创建的例行导入作业。 + +只能修改处于 PAUSED 状态的作业。 + +语法: + +```sql +ALTER ROUTINE LOAD FOR [db.]job_name +[job_properties] +FROM data_source +[data_source_properties] +``` + +1. `[db.]job_name` + + 指定要修改的作业名称。 + +2. `tbl_name` + + 指定需要导入的表的名称。 + +3. `job_properties` + + 指定需要修改的作业参数。目前仅支持如下参数的修改: + + 1. `desired_concurrent_number` + 2. `max_error_number` + 3. `max_batch_interval` + 4. `max_batch_rows` + 5. `max_batch_size` + 6. `jsonpaths` + 7. `json_root` + 8. `strip_outer_array` + 9. `strict_mode` + 10. `timezone` + 11. `num_as_string` + 12. `fuzzy_parse` + + +4. `data_source` + + 数据源的类型。当前支持: + + KAFKA + +5. `data_source_properties` + + 数据源的相关属性。目前仅支持: + + 1. `kafka_partitions` + 2. `kafka_offsets` + 3. `kafka_broker_list` + 4. `kafka_topic` + 5. 自定义 property,如 `property.group.id` + + 注: + + 1. `kafka_partitions` 和 `kafka_offsets` 用于修改待消费的 kafka partition 的offset,仅能修改当前已经消费的 partition。不能新增 partition。 + ### Example +1. 将 `desired_concurrent_number` 修改为 1 + + ```sql + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "1" + ); + ``` + +2. 将 `desired_concurrent_number` 修改为 10,修改 partition 的offset,修改 group id。 + + ```sql + ALTER ROUTINE LOAD FOR db1.label1 + PROPERTIES + ( + "desired_concurrent_number" = "10" + ) + FROM kafka + ( + "kafka_partitions" = "0, 1, 2", + "kafka_offsets" = "100, 200, 100", + "property.group.id" = "new_group" + ); + ### Keywords ALTER, ROUTINE, LOAD diff --git a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 1b253e7190..93f5b3ade1 100644 --- a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -26,13 +26,412 @@ under the License. ## BROKER-LOAD +### Name + +BROKER LOAD + ### Description +该命令主要用于通过 Broker 服务进程来导入远端存储(如S3、HDFS)上的数据。 + +```sql +LOAD LABEL load_label +( +data_desc1[, data_desc2, ...] +) +WITH BROKER broker_name +[broker_properties] +[load_properties]; +``` + +- `load_label` + + 每个导入需要指定一个唯一的 Label。后续可以通过这个 label 来查看作业进度。 + + `[database.]label_name` + +- `data_desc1` + + 用于描述一组需要导入的文件。 + + ```sql + [MERGE|APPEND|DELETE] + DATA INFILE + ( + "file_path1"[, file_path2, ...] + ) + [NEGATIVE] + INTO TABLE `table_name` + [PARTITION (p1, p2, ...)] + [COLUMNS TERMINATED BY "column_separator"] + [FORMAT AS "file_type"] + [(column_list)] + [COLUMNS FROM PATH AS (c1, c2, ...)] + [PRECEDING FILTER predicate] + [SET (column_mapping)] + [WHERE predicate] + [DELETE ON expr] + [ORDER BY source_sequence] + ``` + + - `[MERGE|APPEND|DELETE]` + + 数据合并类型。默认为 APPEND,表示本次导入是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 `[DELETE ON]` 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示本次导入的所有数据皆为删除数据。 + + - `DATA INFILE` + + 指定需要导入的文件路径。可以是多个。可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入会失败。 + + - `NEGTIVE` + + 该关键词用于表示本次导入为一批”负“导入。这种方式仅针对具有整型 SUM 聚合类型的聚合数据表。该方式会将导入数据中,SUM 聚合列对应的整型数值取反。主要用于冲抵之前导入错误的数据。 + + - `PARTITION(p1, p2, ...)` + + 可以指定仅导入表的某些分区。不再分区范围内的数据将被忽略。 + + - `COLUMNS TERMINATED BY` + + 指定列分隔符。仅在 CSV 格式下有效。仅能指定单字节分隔符。 + + - `FORMAT AS` + + 指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。 + + - `column list` + + 用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤](..../../../data-operate/import/import-scenes/load-data-convert.html) 文档。 + + `(k1, k2, tmpk1)` + + - `COLUMNS FROM PATH AS` + + 指定从导入文件路径中抽取的列。 + + - `PRECEDING FILTER predicate` + + 前置过滤条件。数据首先根据 `column list` 和 `COLUMNS FROM PATH AS` 按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤](../../../data-operate/import/import-scenes/load-data-convert.html) 文档。 + + - `SET (column_mapping)` + + 指定列的转换函数。 + + - `WHERE predicate` + + 根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤](../../../data-operate/import/import-scenes/load-data-convert.html) 文档。 + + - `DELETE ON expr` + + 需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。 + + - `ORDER BY` + + 仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。 + +- `WITH BROKER broker_name` + + 指定需要使用的 Broker 服务名称。在公有云 Doris 中。Broker 服务名称为 `bos` + +- `broker_properties` + + 指定 broker 所需的信息。这些信息通常被用于 Broker 能够访问远端存储系统。如 BOS 或 HDFS。关于具体信息,可参阅 [Broker](../../../advanced/broker.html) 文档。 + + ```text + ( + "key1" = "val1", + "key2" = "val2", + ... + ) + ``` + +- `load_properties` + + 指定导入的相关参数。目前支持以下参数: + + - `timeout` + + 导入超时时间。默认为 4 小时。单位秒。 + + - `max_filter_ratio` + + 最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。取值范围为 0 到 1。 + + - `exec_mem_limit` + + 导入内存限制。默认为 2GB。单位为字节。 + + - `strict_mode` + + 是否对数据进行严格限制。默认为 false。 + + - `timezone` + + 指定某些受时区影响的函数的时区,如 `strftime/alignment_timestamp/from_unixtime` 等等,具体请查阅 [时区](../../advanced/time-zone.html) 文档。如果不指定,则使用 "Asia/Shanghai" 时区 + ### Example +1. 从 HDFS 导入一批数据 + + ```sql + LOAD LABEL example_db.label1 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "," + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ``` + + 导入文件 `file.txt`,按逗号分隔,导入到表 `my_table`。 + +2. 从 HDFS 导入数据,使用通配符匹配两批两批文件。分别导入到两个表中。 + + ```sql + LOAD LABEL example_db.label2 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*") + INTO TABLE `my_table1` + PARTITION (p1) + COLUMNS TERMINATED BY "," + (k1, tmp_k2, tmp_k3) + SET ( + k2 = tmp_k2 + 1, + k3 = tmp_k3 + 1 + ) + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*") + INTO TABLE `my_table2` + COLUMNS TERMINATED BY "," + (k1, k2, k3) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ``` + + 使用通配符匹配导入两批文件 `file-10*` 和 `file-20*`。分别导入到 `my_table1` 和 `my_table2` 两张表中。其中 `my_table1` 指定导入到分区 `p1` 中,并且将导入源文件中第二列和第三列的值 +1 后导入。 + +3. 从 HDFS 导入一批数据。 + + ```sql + LOAD LABEL example_db.label3 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "\\x01" + ) + WITH BROKER my_hdfs_broker + ( + "username" = "", + "password" = "", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ``` + + 指定分隔符为 Hive 的默认分隔符 `\\x01`,并使用通配符 * 指定 `data` 目录下所有目录的所有文件。使用简单认证,同时配置 namenode HA。 + +4. 导入 Parquet 格式数据,指定 FORMAT 为 parquet。默认是通过文件后缀判断 + + ```sql + LOAD LABEL example_db.label4 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file") + INTO TABLE `my_table` + FORMAT AS "parquet" + (k1, k2, k3) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ``` + +5. 导入数据,并提取文件路径中的分区字段 + + ```sql + LOAD LABEL example_db.label10 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") + INTO TABLE `my_table` + FORMAT AS "csv" + (k1, k2, k3) + COLUMNS FROM PATH AS (city, utc_date) + ) + WITH BROKER hdfs + ( + "username"="hdfs_user", + "password"="hdfs_password" + ); + ``` + + `my_table` 表中的列为 `k1, k2, k3, city, utc_date`。 + + 其中 `hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing` 目录下包括如下文件: + + ```text + hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv + hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv + ``` + + 文件中只包含 `k1, k2, k3` 三列数据,`city, utc_date` 这两列数据会从文件路径中提取。 + +6. 对待导入数据进行过滤。 + + ```sql + LOAD LABEL example_db.label6 + ( + DATA INFILE("hdfs://host:port/input/file") + INTO TABLE `my_table` + (k1, k2, k3) + PRECEDING FILTER k1 = 1 + SET ( + k2 = k2 + 1 + ) + WHERE k1 > k2 + ) + WITH BROKER hdfs + ( + "username"="user", + "password"="pass" + ); + ``` + + 只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。 + +7. 导入数据,提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换) + + ```sql + LOAD LABEL example_db.label7 + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl12` + COLUMNS TERMINATED BY "," + (k2,k3) + COLUMNS FROM PATH AS (data_time) + SET ( + data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') + ) + ) + WITH BROKER hdfs + ( + "username"="user", + "password"="pass" + ); + ``` + + 路径下有如下文件: + + ```text + /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt + /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt + ``` + + 表结构为: + + ```text + data_time DATETIME, + k2 INT, + k3 INT + ``` + +8. 从 HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker 的 broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入 + + ```sql + LOAD LABEL example_db.label8 + ( + MERGE DATA INFILE("HDFS://test:802/input/file") + INTO TABLE `my_table` + (k1, k2, k3, v2, v1) + DELETE ON v2 > 100 + ) + WITH HDFS + ( + "username"="user", + "password"="pass" + ) + PROPERTIES + ( + "timeout" = "3600", + "max_filter_ratio" = "0.1" + ); + ``` + + 使用 MERGE 方式导入。`my_table` 必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。 + + 导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。 + +9. 导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序: + + ```sql + LOAD LABEL example_db.label9 + ( + DATA INFILE("HDFS://test:802/input/file") + INTO TABLE `my_table` + COLUMNS TERMINATED BY "," + (k1,k2,source_sequence,v1,v2) + ORDER BY source_sequence + ) + WITH HDFS + ( + "username"="user", + "password"="pass" + ) + ``` + + `my_table` 必须是是 Unqiue Key 模型表,并且指定了 Sequcence Col。数据会按照源数据中 `source_sequence` 列的值来保证顺序性。 + ### Keywords BROKER, LOAD ### Best Practice +1. 查看导入任务状态 + + Broker Load 是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过 [SHOW LOAD](../../Show-Statements/SHOW-LOAD.html) 命令查看。 + +2. 取消导入任务 + + 已提交切尚未结束的导入任务可以通过 [CANCEL LOAD](./CANCEL-LOAD.html) 命令取消。取消后,已写入的数据也会回滚,不会生效。 + +3. Label、导入事务、多表原子性 + + Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 [导入事务和原子性](../../../data-operate/import/import-scenes/load-atomicity.html) 文档。 + +4. 列映射、衍生列和过滤 + + Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 [列的映射,转换与过滤](../../../data-operate/import/import-scenes/load-data-convert.html) 文档。 + +5. 错误数据过滤 + + Doris 的导入任务可以容忍一部分格式错误的数据。容忍了通过 `max_filter_ratio` 设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果用户希望忽略部分有问题的数据行,可以将次参数设置为 0~1 之间的数值,Doris 会自动跳过哪些数据格式不正确的行。 + + 关于容忍率的一些计算方式,可以参阅 [列的映射,转换与过滤](../../../data-operate/import/import-scenes/load-data-convert.html) 文档。 + +6. 严格模式 + + `strict_mode` 属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。关于严格模式的具体说明,可参阅 [严格模式](../../../data-operate/import/import-scenes/load-strict-mode.html) 文档。 + +7. 超时时间 + + Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。 + +8. 数据量和任务数限制 + + Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加。 + + 同时受限于集群规模,我们限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。 + + Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。 diff --git a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md index 4a8adeae6a..2f4ca856a4 100644 --- a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md +++ b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CANCEL-LOAD.md @@ -26,13 +26,43 @@ under the License. ## CANCEL-LOAD +### Name + +CANCEL LOAD + ### Description +该语句用于撤销指定 label 的导入作业。或者通过模糊匹配批量撤销导入作业 + +```sql +CANCEL LOAD +[FROM db_name] +WHERE [LABEL = "load_label" | LABEL like "label_pattern"]; +``` + ### Example +1. 撤销数据库 example_db 上, label 为 `example_db_test_load_label` 的导入作业 + + ```sql + CANCEL LOAD + FROM example_db + WHERE LABEL = "example_db_test_load_label"; + ``` + +2. 撤销数据库 example*db 上, 所有包含 example* 的导入作业。 + + ```sql + CANCEL LOAD + FROM example_db + WHERE LABEL like "example_"; + ``` + ### Keywords CANCEL, LOAD ### Best Practice +1. 只能取消处于 PENDING、ETL、LOADING 状态的未完成的导入作业。 +2. 当执行批量撤销时,Doris 不会保证所有对应的导入作业原子的撤销。即有可能仅有部分导入作业撤销成功。用户可以通过 SHOW LOAD 语句查看作业状态,并尝试重复执行 CANCEL LOAD 语句。 diff --git a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index 9078ca38c8..684882a858 100644 --- a/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/new-docs/zh-CN/sql-manual/sql-reference-v2/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -3,6 +3,7 @@ "title": "CREATE-ROUTINE-LOAD", "language": "zh-CN" } + ---