[fix](sparkdpp) Hive table properties not take effect when create spark session (#21881)

When creating a Hive external table for Spark loading, the Hive external table includes related information such as the Hive Metastore. However, when submitting a job, it is required to have the hive-site.xml file in the Spark conf directory; otherwise, the Spark job may fail with an error message indicating that the corresponding Hive table cannot be found.

The SparkEtlJob.initSparkConfigs method sets the properties of the external table into the Spark conf. However, at this point, the Spark session has already been created, and the Hive-related parameters will not take effect. To ensure that the Spark Hive catalog properly loads Hive tables, you need to set the Hive-related parameters before creating the Spark session.

Co-authored-by: zhangshixin <zhangshixin@youzan.com>
This commit is contained in:
HonestManXin
2023-07-20 14:36:00 +08:00
committed by GitHub
parent 2ae9bfa3b2
commit 365afb5389
2 changed files with 73 additions and 19 deletions

View File

@ -30,14 +30,21 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -62,6 +69,7 @@ public class SparkEtlJob {
private Set<Long> hiveSourceTables;
private Map<Long, Set<String>> tableToBitmapDictColumns;
private Map<Long, Set<String>> tableToBinaryBitmapColumns;
private final SparkConf conf;
private SparkSession spark;
private SparkEtlJob(String jobConfigFilePath) {
@ -70,10 +78,10 @@ public class SparkEtlJob {
this.hiveSourceTables = Sets.newHashSet();
this.tableToBitmapDictColumns = Maps.newHashMap();
this.tableToBinaryBitmapColumns = Maps.newHashMap();
conf = new SparkConf();
}
private void initSparkEnvironment() {
SparkConf conf = new SparkConf();
private void initSpark() {
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator");
@ -86,14 +94,19 @@ public class SparkEtlJob {
return;
}
for (Map.Entry<String, String> entry : configs.entrySet()) {
spark.sparkContext().conf().set(entry.getKey(), entry.getValue());
conf.set(entry.getKey(), entry.getValue());
conf.set("spark.hadoop." + entry.getKey(), entry.getValue());
}
}
private void initConfig() {
private void initConfig() throws IOException {
LOG.debug("job config file path: " + jobConfigFilePath);
Dataset<String> ds = spark.read().textFile(jobConfigFilePath);
String jsonConfig = ds.first();
Configuration hadoopConf = SparkHadoopUtil.get().newConfiguration(this.conf);
String jsonConfig;
Path path = new Path(jobConfigFilePath);
try (FileSystem fs = path.getFileSystem(hadoopConf); DataInputStream in = fs.open(path)) {
jsonConfig = CharStreams.toString(new InputStreamReader(in));
}
LOG.debug("rdd read json config: " + jsonConfig);
etlJobConfig = EtlJobConfig.configFromJson(jsonConfig);
LOG.debug("etl job config: " + etlJobConfig);
@ -241,12 +254,12 @@ public class SparkEtlJob {
}
}
initSpark();
// data partition sort and aggregation
processDpp();
}
private void run() throws Exception {
initSparkEnvironment();
initConfig();
checkConfig();
processData();

View File

@ -31,14 +31,19 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -92,20 +97,15 @@ public class SparkEtlJobTest {
}
@Test
public void testInitConfig(@Mocked SparkSession spark, @Injectable Dataset<String> ds) {
public void testInitConfig(@Mocked FileSystem fs) throws IOException {
new Expectations() {
{
SparkSession.builder().enableHiveSupport().getOrCreate();
result = spark;
spark.read().textFile(anyString);
result = ds;
ds.first();
result = etlJobConfig.configToJson();
fs.open(new Path("hdfs://127.0.0.1:10000/jobconfig.json"));
result = new FSDataInputStream(new SeekableByteArrayInputStream(etlJobConfig.configToJson().getBytes()));
}
};
SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json");
Deencapsulation.invoke(job, "initSparkEnvironment");
Deencapsulation.invoke(job, "initConfig");
EtlJobConfig parsedConfig = Deencapsulation.getField(job, "etlJobConfig");
Assert.assertTrue(parsedConfig.tables.containsKey(tableId));
@ -150,4 +150,45 @@ public class SparkEtlJobTest {
// check remove v2 bitmap_dict func mapping from file group column mappings
Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2"));
}
private static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);
}
public void seek(long position) {
if (position < 0 || position >= buf.length) {
throw new IllegalArgumentException("pos = " + position + " length = " + buf.length);
}
this.pos = (int) position;
}
public long getPos() {
return this.pos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
this.seek(position);
return this.read(buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
if (position + length > buf.length) {
throw new EOFException("End of file reached before reading fully.");
}
System.arraycopy(buf, (int) position, buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
readFully(position, buffer, 0, buffer.length);
}
}
}