[typo](doc) spark connector add new param and a way to write bitmap and hll col (#20850)
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
---
|
||||
{
|
||||
"title": "Spark Doris Connector",
|
||||
"language": "en"
|
||||
"title": "Spark Doris Connector",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
|
||||
@ -38,7 +38,7 @@ Github: https://github.com/apache/doris-spark-connector
|
||||
## Version Compatibility
|
||||
|
||||
| Connector | Spark | Doris | Java | Scala |
|
||||
| --------- | ------------- |-------------| ---- | ---------- |
|
||||
|-----------|---------------|-------------|------|------------|
|
||||
| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
|
||||
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
|
||||
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |
|
||||
@ -50,13 +50,17 @@ Ready to work
|
||||
1. Modify the `custom_env.sh.tpl` file and rename it to `custom_env.sh`
|
||||
|
||||
2. Execute following command in source dir:
|
||||
`sh build.sh`
|
||||
Follow the prompts to enter the Scala and Spark versions you need to start compiling.
|
||||
`sh build.sh`
|
||||
Follow the prompts to enter the Scala and Spark versions you need to start compiling.
|
||||
|
||||
After the compilation is successful, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar`.
|
||||
Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode, put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package.
|
||||
After the compilation is successful, the target jar package will be generated in the `dist` directory, such
|
||||
as: `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar`.
|
||||
Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode,
|
||||
put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment
|
||||
package.
|
||||
|
||||
For example upload `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add hdfs file path in spark.yarn.jars.
|
||||
For example upload `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add hdfs file path in
|
||||
spark.yarn.jars.
|
||||
|
||||
1. Upload `spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar` Jar to hdfs.
|
||||
|
||||
@ -71,7 +75,6 @@ hdfs dfs -put /your_local_path/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar
|
||||
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT.jar
|
||||
```
|
||||
|
||||
|
||||
## Using Maven
|
||||
|
||||
```
|
||||
@ -87,12 +90,14 @@ spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT
|
||||
Please replace the Connector version according to the different Spark and Scala versions.
|
||||
|
||||
## Example
|
||||
|
||||
### Read
|
||||
|
||||
#### SQL
|
||||
|
||||
```sql
|
||||
CREATE TEMPORARY VIEW spark_doris
|
||||
CREATE
|
||||
TEMPORARY VIEW spark_doris
|
||||
USING doris
|
||||
OPTIONS(
|
||||
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
@ -101,7 +106,8 @@ OPTIONS(
|
||||
"password"="$YOUR_DORIS_PASSWORD"
|
||||
);
|
||||
|
||||
SELECT * FROM spark_doris;
|
||||
SELECT *
|
||||
FROM spark_doris;
|
||||
```
|
||||
|
||||
#### DataFrame
|
||||
@ -121,6 +127,7 @@ dorisSparkDF.show(5)
|
||||
|
||||
```scala
|
||||
import org.apache.doris.spark._
|
||||
|
||||
val dorisSparkRDD = sc.dorisRDD(
|
||||
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
|
||||
cfg = Some(Map(
|
||||
@ -132,15 +139,16 @@ val dorisSparkRDD = sc.dorisRDD(
|
||||
|
||||
dorisSparkRDD.collect()
|
||||
```
|
||||
|
||||
#### pySpark
|
||||
|
||||
```scala
|
||||
dorisSparkDF = spark.read.format("doris")
|
||||
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
|
||||
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
|
||||
.option("user", "$YOUR_DORIS_USERNAME")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
.load()
|
||||
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
|
||||
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
|
||||
.option("user", "$YOUR_DORIS_USERNAME")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
.load()
|
||||
// show 5 lines data
|
||||
dorisSparkDF.show(5)
|
||||
```
|
||||
@ -150,7 +158,8 @@ dorisSparkDF.show(5)
|
||||
#### SQL
|
||||
|
||||
```sql
|
||||
CREATE TEMPORARY VIEW spark_doris
|
||||
CREATE
|
||||
TEMPORARY VIEW spark_doris
|
||||
USING doris
|
||||
OPTIONS(
|
||||
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
@ -159,12 +168,17 @@ OPTIONS(
|
||||
"password"="$YOUR_DORIS_PASSWORD"
|
||||
);
|
||||
|
||||
INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
|
||||
# or
|
||||
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
|
||||
INSERT INTO spark_doris
|
||||
VALUES ("VALUE1", "VALUE2", ...);
|
||||
#
|
||||
or
|
||||
INSERT INTO spark_doris
|
||||
SELECT *
|
||||
FROM YOUR_TABLE
|
||||
```
|
||||
|
||||
#### DataFrame(batch/stream)
|
||||
|
||||
```scala
|
||||
## batch sink
|
||||
val mockDataDF = List(
|
||||
@ -181,7 +195,7 @@ mockDataDF.write.format("doris")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
//other options
|
||||
//specify the fields to write
|
||||
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
|
||||
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
|
||||
.save()
|
||||
|
||||
## stream sink(StructuredStreaming)
|
||||
@ -201,7 +215,7 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
//other options
|
||||
//specify the fields to write
|
||||
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
|
||||
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
|
||||
.start()
|
||||
.awaitTermination()
|
||||
```
|
||||
@ -210,50 +224,77 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
|
||||
|
||||
### General
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| -------------------------------- | ----------------- | ------------------------------------------------------------ |
|
||||
| doris.fenodes | -- | Doris FE http address, support multiple addresses, separated by commas |
|
||||
| doris.table.identifier | -- | Doris table identifier, eg, db1.tbl1 |
|
||||
| doris.request.retries | 3 | Number of retries to send requests to Doris |
|
||||
| doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris |
|
||||
| doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris |
|
||||
| doris.request.query.timeout.s | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit |
|
||||
| doris.request.tablet.size | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an RDD Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the Spark side, but at the same time will cause greater pressure on Doris. |
|
||||
| doris.batch.size | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay. |
|
||||
| doris.exec.mem.limit | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. |
|
||||
| doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration |
|
||||
| doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true |
|
||||
| doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. |
|
||||
| sink.batch.size | 10000 | Maximum number of lines in a single write BE |
|
||||
| sink.max-retries | 1 | Number of retries after writing BE failed |
|
||||
| sink.properties.* | -- | The stream load parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br /> |
|
||||
| doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with doris.sink.task.use.repartition. |
|
||||
| doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). |
|
||||
| doris.sink.batch.interval.ms | 50 | The interval time of each batch sink, unit ms. |
|
||||
| Key | Default Value | Comment |
|
||||
|----------------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| doris.fenodes | -- | Doris FE http address, support multiple addresses, separated by commas |
|
||||
| doris.table.identifier | -- | Doris table identifier, eg, db1.tbl1 |
|
||||
| doris.request.retries | 3 | Number of retries to send requests to Doris |
|
||||
| doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris |
|
||||
| doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris |
|
||||
| doris.request.query.timeout.s | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit |
|
||||
| doris.request.tablet.size | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an RDD Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the Spark side, but at the same time will cause greater pressure on Doris. |
|
||||
| doris.read.field | -- | List of column names in the Doris table, separated by commas |
|
||||
| doris.batch.size | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay. |
|
||||
| doris.exec.mem.limit | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. |
|
||||
| doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration |
|
||||
| doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true |
|
||||
| doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. |
|
||||
| sink.batch.size | 10000 | Maximum number of lines in a single write BE |
|
||||
| sink.max-retries | 1 | Number of retries after writing BE failed |
|
||||
| sink.properties.* | -- | The stream load parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br /> |
|
||||
| doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with doris.sink.task.use.repartition. |
|
||||
| doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). |
|
||||
| doris.sink.batch.interval.ms | 50 | The interval time of each batch sink, unit ms. |
|
||||
|
||||
### SQL & Dataframe Configuration
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| ------------------------------- | ------------- | ------------------------------------------------------------ |
|
||||
| user | -- | Doris username |
|
||||
| password | -- | Doris password |
|
||||
| Key | Default Value | Comment |
|
||||
|---------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| user | -- | Doris username |
|
||||
| password | -- | Doris password |
|
||||
| doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. |
|
||||
| doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema. <br/> eg: when 'doris.ignore-type'='bitmap,hll' |
|
||||
|
||||
* Note: In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows:
|
||||
> 1. BITMAP
|
||||
> ```sql
|
||||
> CREATE TEMPORARY VIEW spark_doris
|
||||
> USING doris
|
||||
> OPTIONS(
|
||||
> "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
> "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
|
||||
> "user"="$YOUR_DORIS_USERNAME",
|
||||
> "password"="$YOUR_DORIS_PASSWORD"
|
||||
> "doris.ignore-type"="bitmap",
|
||||
> "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
|
||||
> );
|
||||
> ```
|
||||
> 2. HLL
|
||||
> ```sql
|
||||
> CREATE TEMPORARY VIEW spark_doris
|
||||
> USING doris
|
||||
> OPTIONS(
|
||||
> "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
> "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
|
||||
> "user"="$YOUR_DORIS_USERNAME",
|
||||
> "password"="$YOUR_DORIS_PASSWORD"
|
||||
> "doris.ignore-type"="hll",
|
||||
> "doris.write.fields"="col1,hll_col1=hll_hash(col1)"
|
||||
> );
|
||||
> ```
|
||||
|
||||
### RDD Configuration
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| --------------------------- | ------------- | ------------------------------------------------------------ |
|
||||
| doris.request.auth.user | -- | Doris username |
|
||||
| doris.request.auth.password | -- | Doris password |
|
||||
| doris.read.field | -- | List of column names in the Doris table, separated by commas |
|
||||
| Key | Default Value | Comment |
|
||||
|-----------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| doris.request.auth.user | -- | Doris username |
|
||||
| doris.request.auth.password | -- | Doris password |
|
||||
| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. |
|
||||
|
||||
|
||||
|
||||
## Doris & Spark Column Type Mapping
|
||||
|
||||
| Doris Type | Spark Type |
|
||||
| ---------- | -------------------------------- |
|
||||
|------------|----------------------------------|
|
||||
| NULL_TYPE | DataTypes.NullType |
|
||||
| BOOLEAN | DataTypes.BooleanType |
|
||||
| TINYINT | DataTypes.ByteType |
|
||||
@ -273,4 +314,6 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
|
||||
| TIME | DataTypes.DoubleType |
|
||||
| HLL | Unsupported datatype |
|
||||
|
||||
* Note: In Connector, `DATE` and` DATETIME` are mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text.
|
||||
* Note: In Connector, `DATE` and` DATETIME` are mapped to `String`. Due to the processing logic of the Doris underlying
|
||||
storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String`
|
||||
type to directly return the corresponding time readable text.
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
---
|
||||
{
|
||||
"title": "Spark Doris Connector",
|
||||
"language": "zh-CN"
|
||||
"title": "Spark Doris Connector",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
@ -38,7 +38,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据
|
||||
## 版本兼容
|
||||
|
||||
| Connector | Spark | Doris | Java | Scala |
|
||||
| --------- | ------------- |-------------| ---- | ---------- |
|
||||
|-----------|---------------|-------------|------|------------|
|
||||
| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
|
||||
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
|
||||
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |
|
||||
@ -50,15 +50,16 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据
|
||||
1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh`
|
||||
|
||||
2. 在源码目录下执行:
|
||||
`sh build.sh`
|
||||
根据提示输入你需要的 Scala 与 Spark 版本进行编译。
|
||||
`sh build.sh`
|
||||
根据提示输入你需要的 Scala 与 Spark 版本进行编译。
|
||||
|
||||
编译成功后,会在 `dist` 目录生成目标jar包,如:`spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`。
|
||||
将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。
|
||||
|
||||
例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。
|
||||
|
||||
例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar 包路径
|
||||
例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar
|
||||
包路径
|
||||
|
||||
1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到hdfs。
|
||||
|
||||
@ -88,12 +89,14 @@ spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT
|
||||
请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。
|
||||
|
||||
## 使用示例
|
||||
|
||||
### 读取
|
||||
|
||||
#### SQL
|
||||
|
||||
```sql
|
||||
CREATE TEMPORARY VIEW spark_doris
|
||||
CREATE
|
||||
TEMPORARY VIEW spark_doris
|
||||
USING doris
|
||||
OPTIONS(
|
||||
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
@ -102,7 +105,8 @@ OPTIONS(
|
||||
"password"="$YOUR_DORIS_PASSWORD"
|
||||
);
|
||||
|
||||
SELECT * FROM spark_doris;
|
||||
SELECT *
|
||||
FROM spark_doris;
|
||||
```
|
||||
|
||||
#### DataFrame
|
||||
@ -122,6 +126,7 @@ dorisSparkDF.show(5)
|
||||
|
||||
```scala
|
||||
import org.apache.doris.spark._
|
||||
|
||||
val dorisSparkRDD = sc.dorisRDD(
|
||||
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
|
||||
cfg = Some(Map(
|
||||
@ -138,11 +143,11 @@ dorisSparkRDD.collect()
|
||||
|
||||
```scala
|
||||
dorisSparkDF = spark.read.format("doris")
|
||||
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
|
||||
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
|
||||
.option("user", "$YOUR_DORIS_USERNAME")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
.load()
|
||||
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
|
||||
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
|
||||
.option("user", "$YOUR_DORIS_USERNAME")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
.load()
|
||||
// show 5 lines data
|
||||
dorisSparkDF.show(5)
|
||||
```
|
||||
@ -152,7 +157,8 @@ dorisSparkDF.show(5)
|
||||
#### SQL
|
||||
|
||||
```sql
|
||||
CREATE TEMPORARY VIEW spark_doris
|
||||
CREATE
|
||||
TEMPORARY VIEW spark_doris
|
||||
USING doris
|
||||
OPTIONS(
|
||||
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
@ -161,9 +167,13 @@ OPTIONS(
|
||||
"password"="$YOUR_DORIS_PASSWORD"
|
||||
);
|
||||
|
||||
INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
|
||||
# or
|
||||
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
|
||||
INSERT INTO spark_doris
|
||||
VALUES ("VALUE1", "VALUE2", ...);
|
||||
#
|
||||
or
|
||||
INSERT INTO spark_doris
|
||||
SELECT *
|
||||
FROM YOUR_TABLE
|
||||
```
|
||||
|
||||
#### DataFrame(batch/stream)
|
||||
@ -184,7 +194,7 @@ mockDataDF.write.format("doris")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
//其它选项
|
||||
//指定你要写入的字段
|
||||
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
|
||||
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
|
||||
.save()
|
||||
|
||||
## stream sink(StructuredStreaming)
|
||||
@ -204,62 +214,91 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
|
||||
.option("password", "$YOUR_DORIS_PASSWORD")
|
||||
//其它选项
|
||||
//指定你要写入的字段
|
||||
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
|
||||
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
|
||||
.start()
|
||||
.awaitTermination()
|
||||
```
|
||||
|
||||
### Java示例
|
||||
|
||||
`samples/doris-demo/spark-demo/` 下提供了 Java 版本的示例,可供参考,[这里](https://github.com/apache/incubator-doris/tree/master/samples/doris-demo/spark-demo)
|
||||
`samples/doris-demo/spark-demo/` 下提供了 Java
|
||||
版本的示例,可供参考,[这里](https://github.com/apache/incubator-doris/tree/master/samples/doris-demo/spark-demo)
|
||||
|
||||
## 配置
|
||||
|
||||
### 通用配置项
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| -------------------------------- | ----------------- | ------------------------------------------------------------ |
|
||||
| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 |
|
||||
| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 |
|
||||
| doris.request.retries | 3 | 向Doris发送请求的重试次数 |
|
||||
| doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
|
||||
| doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
|
||||
| doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
|
||||
| doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
|
||||
| doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
|
||||
| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
|
||||
| doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
|
||||
| doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
|
||||
| doris.write.fields | -- | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照Doris表字段顺序写入全部字段。 |
|
||||
| sink.batch.size | 10000 | 单次写BE的最大行数 |
|
||||
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
|
||||
| sink.properties.* | -- | Stream Load 的导入参数。<br/>例如: 'sink.properties.column_separator' = ', ' |
|
||||
| Key | Default Value | Comment |
|
||||
|----------------------------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 |
|
||||
| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 |
|
||||
| doris.request.retries | 3 | 向Doris发送请求的重试次数 |
|
||||
| doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
|
||||
| doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
|
||||
| doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
|
||||
| doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
|
||||
| doris.read.field | -- | 读取Doris表的列名列表,多列之间使用逗号分隔 |
|
||||
| doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
|
||||
| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
|
||||
| doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
|
||||
| doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
|
||||
| doris.write.fields | -- | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照Doris表字段顺序写入全部字段。 |
|
||||
| sink.batch.size | 10000 | 单次写BE的最大行数 |
|
||||
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
|
||||
| sink.properties.* | -- | Stream Load 的导入参数。<br/>例如: 'sink.properties.column_separator' = ', ' |
|
||||
| doris.sink.task.partition.size | -- | Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。<br/>此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 |
|
||||
| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
|
||||
| doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 |
|
||||
| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
|
||||
| doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 |
|
||||
|
||||
### SQL 和 Dataframe 专有配置
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| ------------------------------- | ------------- | ------------------------------------------------------------ |
|
||||
| user | -- | 访问Doris的用户名 |
|
||||
| password | -- | 访问Doris的密码 |
|
||||
| doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
|
||||
| Key | Default Value | Comment |
|
||||
|---------------------------------|---------------|------------------------------------------------------------------------|
|
||||
| user | -- | 访问Doris的用户名 |
|
||||
| password | -- | 访问Doris的密码 |
|
||||
| doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
|
||||
| doris.ignore-type | -- | 指在定临时视图中,读取 schema 时要忽略的字段类型。<br/> 例如,'doris.ignore-type'='bitmap,hll' |
|
||||
|
||||
* 注:在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型, 并通过 `doris.write.fields` 对列进行映射转换,使用方式如下:
|
||||
> 1. BITMAP
|
||||
> ```sql
|
||||
> CREATE TEMPORARY VIEW spark_doris
|
||||
> USING doris
|
||||
> OPTIONS(
|
||||
> "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
> "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
|
||||
> "user"="$YOUR_DORIS_USERNAME",
|
||||
> "password"="$YOUR_DORIS_PASSWORD"
|
||||
> "doris.ignore-type"="bitmap",
|
||||
> "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
|
||||
> );
|
||||
> ```
|
||||
> 2. HLL
|
||||
> ```sql
|
||||
> CREATE TEMPORARY VIEW spark_doris
|
||||
> USING doris
|
||||
> OPTIONS(
|
||||
> "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
|
||||
> "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
|
||||
> "user"="$YOUR_DORIS_USERNAME",
|
||||
> "password"="$YOUR_DORIS_PASSWORD"
|
||||
> "doris.ignore-type"="hll",
|
||||
> "doris.write.fields"="col1,hll_col1=hll_hash(col1)"
|
||||
> );
|
||||
> ```
|
||||
|
||||
### RDD 专有配置
|
||||
|
||||
| Key | Default Value | Comment |
|
||||
| --------------------------- | ------------- | ------------------------------------------------------------ |
|
||||
| doris.request.auth.user | -- | 访问Doris的用户名 |
|
||||
| doris.request.auth.password | -- | 访问Doris的密码 |
|
||||
| doris.read.field | -- | 读取Doris表的列名列表,多列之间使用逗号分隔 |
|
||||
| Key | Default Value | Comment |
|
||||
|-----------------------------|---------------|----------------------------------------------|
|
||||
| doris.request.auth.user | -- | 访问Doris的用户名 |
|
||||
| doris.request.auth.password | -- | 访问Doris的密码 |
|
||||
| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
|
||||
|
||||
|
||||
## Doris 和 Spark 列类型映射关系
|
||||
|
||||
| Doris Type | Spark Type |
|
||||
| ---------- | -------------------------------- |
|
||||
|------------|----------------------------------|
|
||||
| NULL_TYPE | DataTypes.NullType |
|
||||
| BOOLEAN | DataTypes.BooleanType |
|
||||
| TINYINT | DataTypes.ByteType |
|
||||
|
||||
Reference in New Issue
Block a user