From 365afb538941a2ee2113b7ffd00abe179eceac07 Mon Sep 17 00:00:00 2001 From: HonestManXin Date: Thu, 20 Jul 2023 14:36:00 +0800 Subject: [PATCH] [fix](sparkdpp) Hive table properties not take effect when create spark session (#21881) When creating a Hive external table for Spark loading, the Hive external table includes related information such as the Hive Metastore. However, when submitting a job, it is required to have the hive-site.xml file in the Spark conf directory; otherwise, the Spark job may fail with an error message indicating that the corresponding Hive table cannot be found. The SparkEtlJob.initSparkConfigs method sets the properties of the external table into the Spark conf. However, at this point, the Spark session has already been created, and the Hive-related parameters will not take effect. To ensure that the Spark Hive catalog properly loads Hive tables, you need to set the Hive-related parameters before creating the Spark session. Co-authored-by: zhangshixin --- .../doris/load/loadv2/etl/SparkEtlJob.java | 29 ++++++--- .../load/loadv2/etl/SparkEtlJobTest.java | 63 +++++++++++++++---- 2 files changed, 73 insertions(+), 19 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java index 34f4c0a8c3..484ff4eca2 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -30,14 +30,21 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.io.CharStreams; import org.apache.commons.collections.map.MultiValueMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; +import org.apache.spark.deploy.SparkHadoopUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,6 +69,7 @@ public class SparkEtlJob { private Set hiveSourceTables; private Map> tableToBitmapDictColumns; private Map> tableToBinaryBitmapColumns; + private final SparkConf conf; private SparkSession spark; private SparkEtlJob(String jobConfigFilePath) { @@ -70,10 +78,10 @@ public class SparkEtlJob { this.hiveSourceTables = Sets.newHashSet(); this.tableToBitmapDictColumns = Maps.newHashMap(); this.tableToBinaryBitmapColumns = Maps.newHashMap(); + conf = new SparkConf(); } - private void initSparkEnvironment() { - SparkConf conf = new SparkConf(); + private void initSpark() { //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator"); @@ -86,14 +94,19 @@ public class SparkEtlJob { return; } for (Map.Entry entry : configs.entrySet()) { - spark.sparkContext().conf().set(entry.getKey(), entry.getValue()); + conf.set(entry.getKey(), entry.getValue()); + conf.set("spark.hadoop." + entry.getKey(), entry.getValue()); } } - private void initConfig() { + private void initConfig() throws IOException { LOG.debug("job config file path: " + jobConfigFilePath); - Dataset ds = spark.read().textFile(jobConfigFilePath); - String jsonConfig = ds.first(); + Configuration hadoopConf = SparkHadoopUtil.get().newConfiguration(this.conf); + String jsonConfig; + Path path = new Path(jobConfigFilePath); + try (FileSystem fs = path.getFileSystem(hadoopConf); DataInputStream in = fs.open(path)) { + jsonConfig = CharStreams.toString(new InputStreamReader(in)); + } LOG.debug("rdd read json config: " + jsonConfig); etlJobConfig = EtlJobConfig.configFromJson(jsonConfig); LOG.debug("etl job config: " + etlJobConfig); @@ -241,12 +254,12 @@ public class SparkEtlJob { } } + initSpark(); // data partition sort and aggregation processDpp(); } private void run() throws Exception { - initSparkEnvironment(); initConfig(); checkConfig(); processData(); diff --git a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java index 8b94937805..0ea7f66092 100644 --- a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java @@ -31,14 +31,19 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mockit.Expectations; -import mockit.Injectable; import mockit.Mocked; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -92,20 +97,15 @@ public class SparkEtlJobTest { } @Test - public void testInitConfig(@Mocked SparkSession spark, @Injectable Dataset ds) { + public void testInitConfig(@Mocked FileSystem fs) throws IOException { new Expectations() { { - SparkSession.builder().enableHiveSupport().getOrCreate(); - result = spark; - spark.read().textFile(anyString); - result = ds; - ds.first(); - result = etlJobConfig.configToJson(); + fs.open(new Path("hdfs://127.0.0.1:10000/jobconfig.json")); + result = new FSDataInputStream(new SeekableByteArrayInputStream(etlJobConfig.configToJson().getBytes())); } }; SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json"); - Deencapsulation.invoke(job, "initSparkEnvironment"); Deencapsulation.invoke(job, "initConfig"); EtlJobConfig parsedConfig = Deencapsulation.getField(job, "etlJobConfig"); Assert.assertTrue(parsedConfig.tables.containsKey(tableId)); @@ -150,4 +150,45 @@ public class SparkEtlJobTest { // check remove v2 bitmap_dict func mapping from file group column mappings Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2")); } + + private static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { + public SeekableByteArrayInputStream(byte[] buf) { + super(buf); + } + + public void seek(long position) { + if (position < 0 || position >= buf.length) { + throw new IllegalArgumentException("pos = " + position + " length = " + buf.length); + } + this.pos = (int) position; + } + + public long getPos() { + return this.pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + this.seek(position); + return this.read(buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + if (position + length > buf.length) { + throw new EOFException("End of file reached before reading fully."); + } + System.arraycopy(buf, (int) position, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + } }