8.0 KiB
ROUTINE LOAD
Description
Routine Load enables users to submit a permanent import task and import data into Doris by constantly reading data from a specified data source. Currently only support importing text format (CSV) data from Kakfa through unauthenticated or SSL authentication.
Grammar:
CREATE ROUTINE LOAD [db.]job_name ON tbl_name [load_properties] [job_properties] FROM data_source [data source properties]
- [db.]job_name
The name of the import job, in the same database, can only have one job running with the same name.
- tbl name
Specifies the name of the table to be imported.
- load_properties
Used to describe imported data. Grammar:
[Swing separator], [columns_mapping], [where_predicates], [partitions]
One Column U separator:
Specify column separators, such as:
COLUMNS TERMINATED BY ","
Default: t
- columns_mapping:
Specifies the mapping relationship of columns in source data and defines how derivative columns are generated.
- Mapping column:
Specify in sequence which columns in the source data correspond to those in the destination table. For columns you want to skip, you can specify a column name that does not exist. Assume that the destination table has three columns k1, k2, v1. Source data has four columns, of which columns 1, 2 and 4 correspond to k2, K1 and v1, respectively. Written as follows:
COLUMNS (k2, k1, xxx, v1)
XXX is a non-existent column used to skip the third column in the source data.
- Derivative column:
Columns in the form of col_name = expr are called derived columns. That is to say, it supports calculating the values of the corresponding columns in the destination table by expr. Derivative columns are usually arranged after the mapping column. Although this is not mandatory, Doris always parses the mapping column first and then the derived column. Following an example, suppose that the destination table also has the fourth column v2, which is generated by the sum of K1 and k2. It can be written as follows:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
- where_predicates
Used to specify filtering conditions to filter out unnecessary columns. The filter column can be a mapping column or a derived column. For example, if we only want to import columns with K1 greater than 100 and K2 equal to 1000, we write as follows:
WHERE k1 > 100 and k2 = 1000
Four Division
Specify which partitions to import into the destination table. If not specified, it is automatically imported into the corresponding partition. Examples:
Segmentation (P1, P2, P3)
- job_properties
General parameters used to specify routine import jobs. Grammar:
PROPERTIES ( "key1" = "val1", "key2" = "val2" )
At present, we support the following parameters:
- desired_concurrent_number
Expected concurrency. A routine import job is divided into multiple subtasks. This parameter specifies how many tasks a job can perform simultaneously. Must be greater than 0. The default is 3. This concurrency degree is not the actual concurrency degree. The actual concurrency degree will be considered comprehensively by the node number, load and data source of the cluster. Example:
"desired_concurrent_number" = "3"
- max_batch_interval/max_batch_rows/max_batch_size
These three parameters are respectively expressed as follows:
- Maximum execution time per sub-task in seconds. The range is 5 to 60. The default is 10.
- The maximum number of rows read by each subtask. Must be greater than or equal to 200000. The default is 2000.
- The maximum number of bytes read by each subtask. Units are bytes, ranging from 100MB to 1GB. The default is 100MB.
These three parameters are used to control the execution time and processing capacity of a subtask. When any one reaches the threshold, the task ends. Example:
"max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200"
Three The biggest mistake
The maximum number of error lines allowed in the sampling window. Must be greater than or equal to 0. The default is 0, that is, no error lines are allowed. The sampling window is max_batch_rows* 10. That is, if the number of error lines is greater than max_error_number in the sampling window, routine jobs will be suspended, and manual intervention is needed to check data quality. Rows filtered by where conditions are not incorrect rows.
Five. data source
Types of data sources. Current support:
KAFKA
Six. data source properties
Specify information about the data source. Grammar:
( "key1" = "val1", "key2" = "val2" )
One. KAFKA -25968; 25454;28304;
One. Kafka -u broker list
Kafka's broker connection information. The format is ip: host. Multiple brokers are separated by commas. Examples:
"broker list"= "broker1:9092,broker2:9092"
Two Kafkato Pitch
Specify the topic of Kafka to subscribe to. Examples:
"coffee topic" ="my topic"
Three Kafka score/Kafka offset printing
Specify the Kafka partition to be subscribed to and the corresponding initial offset for each partition.
Offset can specify a specific offset from greater than or equal to 0, or:
- OFFSET_BEGINNING: Subscribe from a location with data.
- OFFSET_END: Subscribe from the end.
If not specified, all partitions under topic are subscribed by default from OFFSET_END. Examples:
"kafka partitions" ="0,1,2,3", "Kafka\ xED"= "101.0, ofset\ xEnd"
- property
Specify custom Kafka parameters. The function is equivalent to the "- property" parameter in the Kafka shell. When the value of a parameter is a file, you need to add the keyword "FILE:" before the value. For how to create a file, see "HELP CREATE FILE;" For more support for custom parameters, see the client-side configuration item in the official CONFIGURATION document of librdkafka.
Examples: "property.client.id" = "12345", "property.ssl.ca.location" ="FILE:ca.pem"
When connecting Kafka with SSL, you need to specify the following parameters:
"property.security.protocol" = "ssl", "property.ssl.ca.location" ="FILE:ca.pem", "property.ssl.certificate.location" ="FILE:client.pem", "property.ssl.key.location" ="FILE:client.key", "property.ssl.key.password" = "abcdefg"
Among them: Property. security. protocol and property. ssl. ca. location are required to specify the connection mode is SSL and the location of CA certificates.
If the client authentication is enabled on the Kafka server side, the following settings are required:
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"
The passwords used to specify the public key, private key and private key of the client, respectively.
- Import data format sample
Integer classes (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1,1000,1234 Floating Point Class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, 356 Date class (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03. String class (CHAR/VARCHAR) (without quotation marks): I am a student, a NULL value: N
example
- Create a Kafka routine import task named test 1 for example_tbl of example_db.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS (k1, k2, k3, v1, v2, v3 = k1 *100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "broker list"= "broker1:9092,broker2:9092,broker3:9092", "kafu topic" ="my topic", "kafka partitions" ="0,1,2,3", "kafka_offsets" = "101,0,0,200" );
- Import data from Kafka cluster through SSL authentication. Set the client. ID parameter at the same time.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS (k1, k2, k3, v1, v2, v3 = k1 *100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "broker list"= "broker1:9092,broker2:9092,broker3:9092", "kafu topic" ="my topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" ="FILE:ca.pem", "property.ssl.certificate.location" ="FILE:client.pem", "property.ssl.key.location" ="FILE:client.key", "property.ssl.key.password" = "abcdefg", "property.client.id" = "my_client_id" );
keyword
CREATE,ROUTINE,LOAD