Doris-spark connector examples (#6485)

* doris spark connector examples

* add usage documentation and license

Co-authored-by: shengy <whyMy2017>
This commit is contained in:
月眸
2021-08-27 10:57:11 +08:00
committed by GitHub
parent 3f2fdd236f
commit ace21ebf83
4 changed files with 326 additions and 0 deletions

View File

@ -0,0 +1,74 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Instructions for use
This series of sample codes mainly shows how to use spark and spark doris connector to read data from doris table.
here give some code examples using java and scala language.
```java
org.apache.doris.demo.spark.DorisSparkConnectionExampleJava
org.apache.doris.demo.spark.DorisSparkConnectionExampleScala
```
**Note:** Because the Spark doris connector jar file is not in the Maven central repository, you need to compile it separately and add to the classpath of your project. Refer to the compilation and use of Spark doris connector:
[Spark doris connector](https://doris.apache.org/master/zh-CN/extending-doris/spark-doris-connector.html)
1. First, create a table in doris with any mysql client
```sql
CREATE TABLE `example_table` (
`id` bigint(20) NOT NULL COMMENT "ID",
`name` varchar(100) NOT NULL COMMENT "Name",
`age` int(11) NOT NULL COMMENT "Age"
) ENGINE = OLAP
UNIQUE KEY(`id`)
COMMENT "example table"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
```
2. Insert some test data to example_table
```sql
insert into example_table values(1,"xx1",21);
insert into example_table values(2,"xx2",21);
```
3. Set doris config in this class
change the Doris DORIS_DB, DORIS_TABLE, DORIS_FE_IP, DORIS_FE_HTTP_PORT,
DORIS_FE_QUERY_PORT, DORIS_USER, DORIS_PASSWORD config in this class
4. Run this class, you should see the output:
```shell
+---+----+---+
| id|name|age|
+---+----+---+
| 1| xx1| 21|
| 2| xx2| 21|
+---+----+---+
```

View File

@ -32,6 +32,43 @@ under the License.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.4</spark.version>
<scala.version>2.11</scala.version>
<scala-library.version>2.11.12</scala-library.version>
</properties>
<dependencies>
<!-- doris spark -->
<dependency>
<groupId>org.apache</groupId>
<artifactId>doris-spark</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala-library.version}</version>
</dependency>
<!-- mysql connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.demo.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* This class is a java demo for doris spark connector,
* and provides three ways to read doris table using spark.
* before you run this class, you need to build doris-spark,
* and put the doris-spark jar file in your maven repository
*/
public class DorisSparkConnectionExampleJava {
private static final String DORIS_DB = "demo";
private static final String DORIS_TABLE = "example_table";
private static final String DORIS_FE_IP = "your doris fe ip";
private static final String DORIS_FE_HTTP_PORT = "8030";
private static final String DORIS_FE_QUERY_PORT = "9030";
private static final String DORIS_USER = "your doris user";
private static final String DORIS_PASSWORD = "your doris password";
public static void main(String[] args) {
SparkSession sc = SparkSession.builder().master("local[*]").appName("test").getOrCreate();
readWithDataFrame(sc);
//readWithSparkSql(sc);
//readWithJdbc(sc);
}
/**
* read doris table Using DataFrame
*
* @param sc SparkSession
*/
private static void readWithDataFrame(SparkSession sc) {
Dataset<Row> df = sc.read().format("doris")
.option("doris.table.identifier", String.format("%s.%s", DORIS_DB, DORIS_TABLE))
.option("doris.fenodes", String.format("%s:%s", DORIS_FE_IP, DORIS_FE_HTTP_PORT))
.option("user", DORIS_USER)
.option("password", DORIS_PASSWORD)
.load();
df.show(5);
}
/**
* read doris table Using Spark Sql
*
* @param sc SparkSession
*/
private static void readWithSparkSql(SparkSession sc) {
sc.sql("CREATE TEMPORARY VIEW spark_doris " +
"USING doris " +
"OPTIONS( " +
" \"table.identifier\"=\"" + DORIS_DB + "." + DORIS_TABLE + "\", " +
" \"fenodes\"=\"" + DORIS_FE_IP + ":" + DORIS_FE_HTTP_PORT + "\", " +
" \"user\"=\"" + DORIS_USER + "\", " +
" \"password\"=\"" + DORIS_PASSWORD + "\" " +
")");
sc.sql("select * from spark_doris").show(5);
}
/**
* read doris table Using jdbc
*
* @param sc SparkSession
*/
private static void readWithJdbc(SparkSession sc) {
Dataset<Row> df = sc.read().format("jdbc")
.option("url", String.format("jdbc:mysql://%s:%s/%s?useUnicode=true&characterEncoding=utf-8", DORIS_FE_IP, DORIS_FE_QUERY_PORT, DORIS_DB))
.option("dbtable", DORIS_TABLE)
.option("user", DORIS_USER)
.option("password", DORIS_PASSWORD)
.load();
df.show(5);
}
}

View File

@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.demo.spark
/**
* This class is a scala demo for doris spark connector,
* and provides four ways to read doris tables using spark.
* before you run this class, you need to build doris-spark,
* and put the doris-spark jar file in your maven repository
*/
object DorisSparkConnectionExampleScala {
val DORIS_DB = "demo"
val DORIS_TABLE = "example_table"
val DORIS_FE_IP = "your doris fe ip"
val DORIS_FE_HTTP_PORT = "8030"
val DORIS_FE_QUERY_PORT = "9030"
val DORIS_USER = "your doris user"
val DORIS_PASSWORD = "your doris password"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
// if you want to run readWithRdd(sparkConf), please comment this line
// val sc = SparkSession.builder().config(sparkConf).getOrCreate()
readWithRdd(sparkConf)
// readWithDataFrame(sc)
// readWithSql(sc)
// readWithJdbc(sc)
}
/**
* read doris table Using Spark Rdd
*/
def readWithRdd(sparkConf: SparkConf): Unit = {
val scf = new SparkContextFunctions(new SparkContext(sparkConf))
val rdd = scf.dorisRDD(
tableIdentifier = Some(s"$DORIS_DB.$DORIS_TABLE"),
cfg = Some(Map(
"doris.fenodes" -> s"$DORIS_FE_IP:$DORIS_FE_HTTP_PORT",
"doris.request.auth.user" -> DORIS_USER,
"doris.request.auth.password" -> DORIS_PASSWORD
))
)
val resultArr = rdd.collect()
println(resultArr.mkString)
}
/**
* read doris table Using DataFrame
*
* @param sc SparkSession
*/
def readWithDataFrame(sc: SparkSession): Unit = {
val df = sc.read.format("doris")
.option("doris.table.identifier", s"$DORIS_DB.$DORIS_TABLE")
.option("doris.fenodes", s"$DORIS_FE_IP:$DORIS_FE_HTTP_PORT")
.option("user", DORIS_USER)
.option("password", DORIS_PASSWORD)
.load()
df.show(5)
}
/**
* read doris table Using Spark Sql
*
* @param sc SparkSession
*/
def readWithSql(sc: SparkSession): Unit = {
sc.sql("CREATE TEMPORARY VIEW spark_doris\n" +
"USING doris " +
"OPTIONS( " +
" \"table.identifier\"=\"" + DORIS_DB + "." + DORIS_TABLE + "\", " +
" \"fenodes\"=\"" + DORIS_FE_IP + ":" + DORIS_FE_HTTP_PORT + "\", " +
" \"user\"=\"" + DORIS_USER + "\", " +
" \"password\"=\"" + DORIS_PASSWORD + "\" " +
")")
sc.sql("select * from spark_doris").show(5)
}
/**
* read doris table Using jdbc
*
* @param sc SparkSession
*/
def readWithJdbc(sc: SparkSession): Unit = {
val df = sc.read.format("jdbc")
.option("url", s"jdbc:mysql://$DORIS_FE_IP:$DORIS_FE_QUERY_PORT/$DORIS_DB?useUnicode=true&characterEncoding=utf-8")
.option("dbtable", DORIS_TABLE)
.option("user", DORIS_USER)
.option("password", DORIS_PASSWORD)
.load()
df.show(5)
}
}