From 60d5e4dfce1e0bb03023d632c8ded0b50e2be386 Mon Sep 17 00:00:00 2001 From: liujinhui <965147871@qq.com> Date: Thu, 20 Oct 2022 08:59:22 +0800 Subject: [PATCH] [improvement](spark-load) support parquet and orc file (#13438) Add support for parquet/orc in SparkDpp.java Fixed sparkDpp checkstyle issue --- .../doris/load/loadv2/dpp/SparkDpp.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 74357a9afe..d64c1080c8 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -22,6 +22,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig; import com.google.common.base.Strings; import com.google.gson.Gson; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -75,7 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; - // This class is a Spark-based data preprocessing program, // which will make use of the distributed compute framework of spark to // do ETL job/sort/preaggregate jobs in spark job @@ -87,6 +87,7 @@ import java.util.Set; // 2. repartition data by using doris data model(partition and bucket) // 3. process aggregation if needed // 4. write data to parquet file + public final class SparkDpp implements java.io.Serializable { private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class); @@ -212,7 +213,6 @@ public final class SparkDpp implements java.io.Serializable { continue; } - String curBucketKey = keyColumns.get(0).toString(); List columnObjects = new ArrayList<>(); for (int i = 1; i < keyColumns.size(); ++i) { @@ -620,6 +620,30 @@ public final class SparkDpp implements java.io.Serializable { if (fileGroup.columnsFromPath != null) { srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath); } + + if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) { + // parquet had its own schema, just use it; perhaps we could add some validation in future. + Dataset dataFrame = spark.read().parquet(fileUrl); + if (!CollectionUtils.isEmpty(columnValueFromPath)) { + for (int k = 0; k < columnValueFromPath.size(); k++) { + dataFrame = dataFrame.withColumn( + fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k))); + } + } + return dataFrame; + } + + if (fileGroup.fileFormat.equalsIgnoreCase("orc")) { + Dataset dataFrame = spark.read().orc(fileUrl); + if (!CollectionUtils.isEmpty(columnValueFromPath)) { + for (int k = 0; k < columnValueFromPath.size(); k++) { + dataFrame = dataFrame.withColumn( + fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k))); + } + } + return dataFrame; + } + StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath); JavaRDD sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD(); int columnSize = dataSrcColumns.size();