diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java index 34f4c0a8c3..484ff4eca2 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -30,14 +30,21 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.io.CharStreams; import org.apache.commons.collections.map.MultiValueMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; +import org.apache.spark.deploy.SparkHadoopUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,6 +69,7 @@ public class SparkEtlJob { private Set hiveSourceTables; private Map> tableToBitmapDictColumns; private Map> tableToBinaryBitmapColumns; + private final SparkConf conf; private SparkSession spark; private SparkEtlJob(String jobConfigFilePath) { @@ -70,10 +78,10 @@ public class SparkEtlJob { this.hiveSourceTables = Sets.newHashSet(); this.tableToBitmapDictColumns = Maps.newHashMap(); this.tableToBinaryBitmapColumns = Maps.newHashMap(); + conf = new SparkConf(); } - private void initSparkEnvironment() { - SparkConf conf = new SparkConf(); + private void initSpark() { //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator"); @@ -86,14 +94,19 @@ public class SparkEtlJob { return; } for (Map.Entry entry : configs.entrySet()) { - spark.sparkContext().conf().set(entry.getKey(), entry.getValue()); + conf.set(entry.getKey(), entry.getValue()); + conf.set("spark.hadoop." + entry.getKey(), entry.getValue()); } } - private void initConfig() { + private void initConfig() throws IOException { LOG.debug("job config file path: " + jobConfigFilePath); - Dataset ds = spark.read().textFile(jobConfigFilePath); - String jsonConfig = ds.first(); + Configuration hadoopConf = SparkHadoopUtil.get().newConfiguration(this.conf); + String jsonConfig; + Path path = new Path(jobConfigFilePath); + try (FileSystem fs = path.getFileSystem(hadoopConf); DataInputStream in = fs.open(path)) { + jsonConfig = CharStreams.toString(new InputStreamReader(in)); + } LOG.debug("rdd read json config: " + jsonConfig); etlJobConfig = EtlJobConfig.configFromJson(jsonConfig); LOG.debug("etl job config: " + etlJobConfig); @@ -241,12 +254,12 @@ public class SparkEtlJob { } } + initSpark(); // data partition sort and aggregation processDpp(); } private void run() throws Exception { - initSparkEnvironment(); initConfig(); checkConfig(); processData(); diff --git a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java index 8b94937805..0ea7f66092 100644 --- a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java @@ -31,14 +31,19 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mockit.Expectations; -import mockit.Injectable; import mockit.Mocked; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -92,20 +97,15 @@ public class SparkEtlJobTest { } @Test - public void testInitConfig(@Mocked SparkSession spark, @Injectable Dataset ds) { + public void testInitConfig(@Mocked FileSystem fs) throws IOException { new Expectations() { { - SparkSession.builder().enableHiveSupport().getOrCreate(); - result = spark; - spark.read().textFile(anyString); - result = ds; - ds.first(); - result = etlJobConfig.configToJson(); + fs.open(new Path("hdfs://127.0.0.1:10000/jobconfig.json")); + result = new FSDataInputStream(new SeekableByteArrayInputStream(etlJobConfig.configToJson().getBytes())); } }; SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json"); - Deencapsulation.invoke(job, "initSparkEnvironment"); Deencapsulation.invoke(job, "initConfig"); EtlJobConfig parsedConfig = Deencapsulation.getField(job, "etlJobConfig"); Assert.assertTrue(parsedConfig.tables.containsKey(tableId)); @@ -150,4 +150,45 @@ public class SparkEtlJobTest { // check remove v2 bitmap_dict func mapping from file group column mappings Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2")); } + + private static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { + public SeekableByteArrayInputStream(byte[] buf) { + super(buf); + } + + public void seek(long position) { + if (position < 0 || position >= buf.length) { + throw new IllegalArgumentException("pos = " + position + " length = " + buf.length); + } + this.pos = (int) position; + } + + public long getPos() { + return this.pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + this.seek(position); + return this.read(buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + if (position + length > buf.length) { + throw new EOFException("End of file reached before reading fully."); + } + System.arraycopy(buf, (int) position, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + } }