[improvement](spark-load) support parquet and orc file (#13438)
Add support for parquet/orc in SparkDpp.java Fixed sparkDpp checkstyle issue
This commit is contained in:
@ -22,6 +22,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -75,7 +76,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
// This class is a Spark-based data preprocessing program,
|
||||
// which will make use of the distributed compute framework of spark to
|
||||
// do ETL job/sort/preaggregate jobs in spark job
|
||||
@ -87,6 +87,7 @@ import java.util.Set;
|
||||
// 2. repartition data by using doris data model(partition and bucket)
|
||||
// 3. process aggregation if needed
|
||||
// 4. write data to parquet file
|
||||
|
||||
public final class SparkDpp implements java.io.Serializable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class);
|
||||
|
||||
@ -212,7 +213,6 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
String curBucketKey = keyColumns.get(0).toString();
|
||||
List<Object> columnObjects = new ArrayList<>();
|
||||
for (int i = 1; i < keyColumns.size(); ++i) {
|
||||
@ -620,6 +620,30 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
if (fileGroup.columnsFromPath != null) {
|
||||
srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
|
||||
}
|
||||
|
||||
if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) {
|
||||
// parquet had its own schema, just use it; perhaps we could add some validation in future.
|
||||
Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
|
||||
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
|
||||
for (int k = 0; k < columnValueFromPath.size(); k++) {
|
||||
dataFrame = dataFrame.withColumn(
|
||||
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
|
||||
}
|
||||
}
|
||||
return dataFrame;
|
||||
}
|
||||
|
||||
if (fileGroup.fileFormat.equalsIgnoreCase("orc")) {
|
||||
Dataset<Row> dataFrame = spark.read().orc(fileUrl);
|
||||
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
|
||||
for (int k = 0; k < columnValueFromPath.size(); k++) {
|
||||
dataFrame = dataFrame.withColumn(
|
||||
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
|
||||
}
|
||||
}
|
||||
return dataFrame;
|
||||
}
|
||||
|
||||
StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
|
||||
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
|
||||
int columnSize = dataSrcColumns.size();
|
||||
|
||||
Reference in New Issue
Block a user