[CodeRefactor] Modify FE modules (#4146)

This CL mainly changes:

1. Add 2 new FE modules

    1. fe-common

        save all common classes for other modules, currently only `jmockit`
        
    2. spark-dpp

        The Spark DPP application for Spark Load. And I removed all dpp related classes to this module, including unit tests.
        
2. Change the `build.sh`

    Add a new param `--spark-dpp` to compile the `spark-dpp` alone. And `--fe` will compile all FE modules.
    
    the output of `spark-dpp` module is `spark-dpp-1.0.0-jar-with-dependencies.jar`, and it will be installed to `output/fe/spark-dpp/`.

3. Modify some bugs of spark load
This commit is contained in:
Mingyu Chen
2020-07-29 16:18:05 +08:00
committed by GitHub
parent 1b3af783e6
commit 0e79f6908b
45 changed files with 878 additions and 140 deletions

4
.gitignore vendored
View File

@ -27,6 +27,8 @@ log/
fe_plugins/*/target
fe_plugins/output
fe/mocked
fe/*/target
dependency-reduced-pom.xml
#ignore eclipse project file & idea project file
@ -43,6 +45,8 @@ be/src/gen_cpp/*.cc
be/src/gen_cpp/*.cpp
be/src/gen_cpp/*.h
be/src/gen_cpp/opcode
be/ut_build_ASAN/
be/tags
#ignore vscode project file
.vscode

View File

@ -49,7 +49,8 @@ usage() {
Usage: $0 <options>
Optional options:
--be build Backend
--fe build Frontend
--fe build Frontend and Spark Dpp application
--spark-dpp build Spark DPP application
--clean clean and build target
--with-mysql enable MySQL support(default)
--without-mysql disable MySQL support
@ -57,12 +58,13 @@ Usage: $0 <options>
--without-lzo disable LZO compress support
Eg.
$0 build Backend and Frontend without clean
$0 build all
$0 --be build Backend without clean
$0 --be --without-mysql build Backend with MySQL disable
$0 --be --without-mysql --without-lzo build Backend with both MySQL and LZO disable
$0 --fe --clean clean and build Frontend
$0 --fe --be --clean clean and build both Frontend and Backend
$0 --fe --clean clean and build Frontend and Spark Dpp application
$0 --fe --be --clean clean and build Frontend, Spark Dpp application and Backend
$0 --spark-dpp build Spark DPP application alone
"
exit 1
}
@ -73,6 +75,7 @@ OPTS=$(getopt \
-o 'h' \
-l 'be' \
-l 'fe' \
-l 'spark-dpp' \
-l 'clean' \
-l 'with-mysql' \
-l 'without-mysql' \
@ -89,26 +92,30 @@ eval set -- "$OPTS"
BUILD_BE=
BUILD_FE=
BUILD_SPARK_DPP=
CLEAN=
RUN_UT=
WITH_MYSQL=ON
WITH_LZO=ON
HELP=0
if [ $# == 1 ] ; then
# defuat
# default
BUILD_BE=1
BUILD_FE=1
BUILD_SPARK_DPP=1
CLEAN=0
RUN_UT=0
else
BUILD_BE=0
BUILD_FE=0
BUILD_SPARK_DPP=0
CLEAN=0
RUN_UT=0
while true; do
case "$1" in
--be) BUILD_BE=1 ; shift ;;
--fe) BUILD_FE=1 ; shift ;;
--spark-dpp) BUILD_SPARK_DPP=1 ; shift ;;
--clean) CLEAN=1 ; shift ;;
--ut) RUN_UT=1 ; shift ;;
--with-mysql) WITH_MYSQL=ON; shift ;;
@ -128,18 +135,19 @@ if [[ ${HELP} -eq 1 ]]; then
exit
fi
if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 ]; then
echo "--clean can not be specified without --fe or --be"
if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 -a ${BUILD_SPARK_DPP} -eq 0 ]; then
echo "--clean can not be specified without --fe or --be or --spark-dpp"
exit 1
fi
echo "Get params:
BUILD_BE -- $BUILD_BE
BUILD_FE -- $BUILD_FE
CLEAN -- $CLEAN
RUN_UT -- $RUN_UT
WITH_MYSQL -- $WITH_MYSQL
WITH_LZO -- $WITH_LZO
BUILD_BE -- $BUILD_BE
BUILD_FE -- $BUILD_FE
BUILD_SPARK_DPP -- $BUILD_SPARK_DPP
CLEAN -- $CLEAN
RUN_UT -- $RUN_UT
WITH_MYSQL -- $WITH_MYSQL
WITH_LZO -- $WITH_LZO
"
# Clean and build generated code
@ -175,14 +183,25 @@ cd ${DORIS_HOME}/docs
./build_help_zip.sh
cd ${DORIS_HOME}
# Assesmble FE modules
FE_MODULES=
if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then
if [ ${BUILD_SPARK_DPP} -eq 1 ]; then
FE_MODULES="fe-common,spark-dpp"
fi
if [ ${BUILD_FE} -eq 1 ]; then
FE_MODULES="fe-common,spark-dpp,fe-core"
fi
fi
# Clean and build Frontend
if [ ${BUILD_FE} -eq 1 ] ; then
echo "Build Frontend"
if [ ${FE_MODULES}x != ""x ]; then
echo "Build Frontend Modules: $FE_MODULES"
cd ${DORIS_HOME}/fe
if [ ${CLEAN} -eq 1 ]; then
${MVN_CMD} clean
fi
${MVN_CMD} package -DskipTests
${MVN_CMD} package -pl ${FE_MODULES} -DskipTests
cd ${DORIS_HOME}
fi
@ -190,19 +209,29 @@ fi
DORIS_OUTPUT=${DORIS_HOME}/output/
mkdir -p ${DORIS_OUTPUT}
#Copy Frontend and Backend
if [ ${BUILD_FE} -eq 1 ]; then
install -d ${DORIS_OUTPUT}/fe/bin ${DORIS_OUTPUT}/fe/conf \
${DORIS_OUTPUT}/fe/webroot/ ${DORIS_OUTPUT}/fe/lib/
# Copy Frontend and Backend
if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then
if [ ${BUILD_FE} -eq 1 ]; then
install -d ${DORIS_OUTPUT}/fe/bin ${DORIS_OUTPUT}/fe/conf \
${DORIS_OUTPUT}/fe/webroot/ ${DORIS_OUTPUT}/fe/lib/ \
${DORIS_OUTPUT}/fe/spark-dpp/
cp -r -p ${DORIS_HOME}/bin/*_fe.sh ${DORIS_OUTPUT}/fe/bin/
cp -r -p ${DORIS_HOME}/conf/fe.conf ${DORIS_OUTPUT}/fe/conf/
rm -rf ${DORIS_OUTPUT}/fe/lib/*
cp -r -p ${DORIS_HOME}/fe/fe-core/target/lib/* ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/fe/fe-core/target/palo-fe.jar ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/docs/build/help-resource.zip ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/webroot/* ${DORIS_OUTPUT}/fe/webroot/
cp -r -p ${DORIS_HOME}/bin/*_fe.sh ${DORIS_OUTPUT}/fe/bin/
cp -r -p ${DORIS_HOME}/conf/fe.conf ${DORIS_OUTPUT}/fe/conf/
rm -rf ${DORIS_OUTPUT}/fe/lib/*
cp -r -p ${DORIS_HOME}/fe/fe-core/target/lib/* ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/fe/fe-core/target/palo-fe.jar ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/docs/build/help-resource.zip ${DORIS_OUTPUT}/fe/lib/
cp -r -p ${DORIS_HOME}/webroot/* ${DORIS_OUTPUT}/fe/webroot/
cp -r -p ${DORIS_HOME}/fe/spark-dpp/target/spark-dpp-*-jar-with-dependencies.jar ${DORIS_OUTPUT}/fe/spark-dpp/
elif [ ${BUILD_SPARK_DPP} -eq 1 ]; then
install -d ${DORIS_OUTPUT}/fe/spark-dpp/
rm -rf ${DORIS_OUTPUT}/fe/spark-dpp/*
cp -r -p ${DORIS_HOME}/fe/spark-dpp/target/spark-dpp-*-jar-with-dependencies.jar ${DORIS_OUTPUT}/fe/spark-dpp/
fi
fi
if [ ${BUILD_BE} -eq 1 ]; then
install -d ${DORIS_OUTPUT}/be/bin \
${DORIS_OUTPUT}/be/conf \

14
fe/README Normal file
View File

@ -0,0 +1,14 @@
# fe-common
This module is used to store some common classes of other modules.
# spark-dpp
This module is Spark DPP program, used for Spark Load function.
Depends: fe-common
# fe-core
This module is the main process module of FE.
Depends: fe-common, spark-dpp

68
fe/fe-common/pom.xml Normal file
View File

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
<artifactId>doris-fe</artifactId>
<version>3.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>fe-common</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<doris.home>${basedir}/../../</doris.home>
</properties>
<build>
<plugins>
<!-- for FE java code style checking -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
<excludes>**/jmockit/**/*</excludes>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -30,7 +30,7 @@ under the License.
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>doris-fe-core</artifactId>
<artifactId>fe-core</artifactId>
<version>3.4.0</version>
<packaging>jar</packaging>
@ -40,6 +40,11 @@ under the License.
</properties>
<dependencies>
<dependency>
<groupId>org.apache</groupId>
<artifactId>spark-dpp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/cglib/cglib -->
<dependency>
<groupId>cglib</groupId>

View File

@ -514,7 +514,7 @@ public class Config extends ConfigBase {
* Default spark dpp version
*/
@ConfField
public static String spark_dpp_version = "1_0_0";
public static String spark_dpp_version = "1.0.0";
/**
* Default spark load timeout
*/

View File

@ -53,6 +53,7 @@ import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.apache.spark.launcher.SparkLauncher;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
@ -78,10 +79,14 @@ public class SparkEtlJobHandler {
class SparkAppListener implements Listener {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {}
public void stateChanged(SparkAppHandle sparkAppHandle) {
LOG.info("get spark state changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {}
public void infoChanged(SparkAppHandle sparkAppHandle) {
LOG.info("get spark info changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId());
}
}
public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource,
@ -134,7 +139,9 @@ public class SparkEtlJobHandler {
.setMainClass(SparkEtlJob.class.getCanonicalName())
.setAppName(String.format(ETL_JOB_NAME, loadLabel))
.setSparkHome(sparkHome)
.addAppArgs(jobConfigHdfsPath);
.addAppArgs(jobConfigHdfsPath)
.redirectError()
.redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log"));
// spark configs
for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {

View File

@ -60,14 +60,14 @@ public class SparkRepository {
public static final String REPOSITORY_DIR = "__spark_repository__";
public static final String PREFIX_ARCHIVE = "__archive_";
public static final String PREFIX_LIB = "__lib_";
public static final String SPARK_DPP = "spark-dpp";
public static final String SPARK_DPP_JAR = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies.jar";
public static final String SPARK_DPP = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies";
public static final String SPARK_2X = "spark-2x";
public static final String SUFFIX = ".zip";
private static final String PATH_DELIMITER = "/";
private static final String FILE_NAME_SEPARATOR = "_";
private static final String DPP_RESOURCE = "/spark-dpp/spark-dpp.jar";
private static final String DPP_RESOURCE_DIR = "/spark-dpp/";
private static final String SPARK_RESOURCE = "/jars/spark-2x.zip";
private String remoteRepositoryPath;
@ -85,7 +85,7 @@ public class SparkRepository {
this.brokerDesc = brokerDesc;
this.currentDppVersion = Config.spark_dpp_version;
this.currentArchive = new SparkArchive(getRemoteArchivePath(currentDppVersion), currentDppVersion);
this.localDppPath = PaloFe.DORIS_HOME_DIR + DPP_RESOURCE;
this.localDppPath = PaloFe.DORIS_HOME_DIR + DPP_RESOURCE_DIR + SPARK_DPP_JAR;
if (!Strings.isNullOrEmpty(Config.spark_resource_path)) {
this.localSpark2xPath = Config.spark_resource_path;
} else {
@ -98,7 +98,7 @@ public class SparkRepository {
}
private void initRepository() throws LoadException {
LOG.info("start to init remote repository");
LOG.info("start to init remote repositoryi. local dpp: {}", this.localDppPath);
boolean needUpload = false;
boolean needReplace = false;
CHECK: {
@ -222,7 +222,11 @@ public class SparkRepository {
if (!fileName.startsWith(PREFIX_LIB)) {
continue;
}
String[] lib_arg = unWrap(PREFIX_LIB, SUFFIX, fileName).split(FILE_NAME_SEPARATOR);
// fileName should like:
// __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar
// __lib_md5sum_spark-2x.zip
String[] lib_arg = unwrap(PREFIX_LIB, fileName).split(FILE_NAME_SEPARATOR);
if (lib_arg.length != 2) {
continue;
}
@ -232,16 +236,12 @@ public class SparkRepository {
}
String type = lib_arg[1];
SparkLibrary.LibType libType = null;
switch (type) {
case SPARK_DPP:
libType = SparkLibrary.LibType.DPP;
break;
case SPARK_2X:
libType = SparkLibrary.LibType.SPARK2X;
break;
default:
Preconditions.checkState(false, "wrong library type: " + type);
break;
if (type.equals(SPARK_DPP)) {
libType = SparkLibrary.LibType.DPP;
} else if (type.equals(SPARK_2X)) {
libType = SparkLibrary.LibType.SPARK2X;
} else {
throw new LoadException("Invalid library type: " + type);
}
SparkLibrary remoteFile = new SparkLibrary(fileStatus.path, md5sum, libType, fileStatus.size);
libraries.add(remoteFile);
@ -259,7 +259,7 @@ public class SparkRepository {
LOG.debug("get md5sum from file {}, md5sum={}", filePath, md5sum);
return md5sum;
} catch (FileNotFoundException e) {
throw new LoadException("file " + filePath + "dose not exist");
throw new LoadException("file " + filePath + " does not exist");
} catch (IOException e) {
throw new LoadException("failed to get md5sum from file " + filePath);
}
@ -302,8 +302,11 @@ public class SparkRepository {
return path.substring(path.lastIndexOf(delimiter) + 1);
}
private static String unWrap(String prefix, String suffix, String fileName) {
return fileName.substring(prefix.length(), fileName.length() - suffix.length());
// input: __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar
// output: md5sum_spark-dpp-1.0.0-jar-with-dependencies
private static String unwrap(String prefix, String fileName) {
int pos = fileName.lastIndexOf(".");
return fileName.substring(prefix.length(), pos);
}
private static String joinPrefix(String prefix, String fileName) {

View File

@ -29,6 +29,8 @@ under the License.
<packaging>pom</packaging>
<modules>
<module>fe-common</module>
<module>spark-dpp</module>
<module>fe-core</module>
</modules>
@ -108,6 +110,18 @@ under the License.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache</groupId>
<artifactId>fe-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache</groupId>
<artifactId>spark-dpp</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/cglib/cglib -->
<dependency>
<groupId>cglib</groupId>
@ -573,6 +587,62 @@ under the License.
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

430
fe/spark-dpp/pom.xml Normal file
View File

@ -0,0 +1,430 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
<artifactId>doris-fe</artifactId>
<version>3.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>spark-dpp</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<doris.home>${basedir}/../../</doris.home>
<fe_ut_parallel>1</fe_ut_parallel>
</properties>
<dependencies>
<dependency>
<groupId>org.apache</groupId>
<artifactId>fe-common</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jmockit/jmockit -->
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<!-- spark -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
<build>
<finalName>spark-dpp-${version}</finalName>
<plugins>
<!-- jmockit -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<!-->set larger, eg, 3, to reduce the time or running FE unit tests<-->
<forkCount>${fe_ut_parallel}</forkCount>
<!-->not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<-->
<reuseForks>false</reuseForks>
<argLine>
-javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar
</argLine>
</configuration>
</plugin>
<!-- copy all dependency libs to target lib dir -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<skip>${skip.plugin}</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.doris.load.loadv2.etl.SparkEtlJob</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>2.7</version>
<configuration>
<check>
<maxmem>1024m</maxmem>
</check>
</configuration>
</plugin>
<!-- for FE java code style checking -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- clean fe/target dir before building -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>auto-clean</id>
<phase>initialize</phase>
<goals>
<goal>clean</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>org.apache.doris.shaded.org.roaringbitmap</shadedPattern>
<pattern>com.google.guava</pattern>
<shadedPattern>org.apache.doris.shaded.com.google.guava</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--set for ecplise lifecycle -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>net.sourceforge.czt.dev</groupId>
<artifactId>cup-maven-plugin</artifactId>
<versionRange>[1.6,)</versionRange>
<goals>
<goal>generate</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>de.jflex</groupId>
<artifactId>maven-jflex-plugin</artifactId>
<versionRange>[1.4.3,)</versionRange>
<goals>
<goal>generate</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<versionRange>[1.6,)</versionRange>
<goals>
<goal>exec</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<versionRange>[1.7,)</versionRange>
<goals>
<goal>add-source</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>copy-dependencies</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<versionRange>[2.6,)</versionRange>
<goals>
<goal>resources</goal>
<goal>testResources</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Codec {
// not support encode negative value now
public static void encodeVarint64(long source, DataOutput out) throws IOException {
assert source >= 0;
short B = 128;
while (source > B) {
out.write((int) (source & (B - 1) | B));
source = source >> 7;
}
out.write((int) (source & (B - 1)));
}
// not support decode negative value now
public static long decodeVarint64(DataInput in) throws IOException {
long result = 0;
int shift = 0;
short B = 128;
while (true) {
int oneByte = in.readUnsignedByte();
boolean isEnd = (oneByte & B) == 0;
result = result | ((long) (oneByte & B - 1) << (shift * 7));
if (isEnd) {
break;
}
shift++;
}
return result;
}
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import com.google.common.base.Strings;
// Exception for Spark DPP process
public class SparkDppException extends Exception {
public SparkDppException(String msg, Throwable cause) {
super(Strings.nullToEmpty(msg), cause);
}
public SparkDppException(Throwable cause) {
super(cause);
}
public SparkDppException(String msg, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(Strings.nullToEmpty(msg), cause, enableSuppression, writableStackTrace);
}
public SparkDppException(String msg) {
super(Strings.nullToEmpty(msg));
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
package org.apache.doris.load.loadv2.dpp;
import org.roaringbitmap.Util;

View File

@ -17,7 +17,7 @@
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.common.UserException;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.joda.time.DateTime;
@ -26,7 +26,7 @@ import java.util.Date;
// Parser to validate value for different type
public abstract class ColumnParser implements Serializable {
public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws UserException {
public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException {
String columnType = etlColumn.columnType;
if (columnType.equalsIgnoreCase("TINYINT")) {
return new TinyIntParser();
@ -52,7 +52,7 @@ public abstract class ColumnParser implements Serializable {
|| columnType.equalsIgnoreCase("HLL")) {
return new StringParser(etlColumn);
} else {
throw new UserException("unsupported type:" + columnType);
throw new SparkDppException("unsupported type:" + columnType);
}
}

View File

@ -27,7 +27,7 @@ public class DorisKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
kryo.register(org.apache.doris.load.loadv2.BitmapValue.class);
kryo.register(Roaring64Map.class);
kryo.register(BitmapValue.class);
}
}

View File

@ -17,7 +17,7 @@
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.common.UserException;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.spark.sql.types.DataType;
@ -69,7 +69,7 @@ public class DppUtils {
return null;
}
public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws UserException {
public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws SparkDppException {
switch (column.columnType) {
case "BOOLEAN":
return Boolean.class;
@ -83,7 +83,7 @@ public class DppUtils {
case "BIGINT":
return Long.class;
case "LARGEINT":
throw new UserException("LARGEINT is not supported now");
throw new SparkDppException("LARGEINT is not supported now");
case "FLOAT":
return Float.class;
case "DOUBLE":
@ -213,14 +213,14 @@ public class DppUtils {
return dstSchema;
}
public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath) throws UserException {
public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath) throws SparkDppException {
if (columnsFromPath == null || columnsFromPath.isEmpty()) {
return Collections.emptyList();
}
String[] strings = filePath.split("/");
if (strings.length < 2) {
System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
}
String[] columns = new String[columnsFromPath.size()];
int size = 0;
@ -231,12 +231,12 @@ public class DppUtils {
}
if (str == null || !str.contains("=")) {
System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
}
String[] pair = str.split("=", 2);
if (pair.length != 2) {
System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
}
int index = columnsFromPath.indexOf(pair[0]);
if (index == -1) {
@ -250,7 +250,7 @@ public class DppUtils {
}
if (size != columnsFromPath.size()) {
System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
}
return Lists.newArrayList(columns);
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
package org.apache.doris.load.loadv2.dpp;
import org.apache.commons.codec.binary.StringUtils;

View File

@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.common.Codec;
import org.apache.doris.load.loadv2.dpp.BitmapValue;
import org.roaringbitmap.BitmapDataProvider;
import org.roaringbitmap.BitmapDataProviderSupplier;
@ -43,10 +46,6 @@ import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.apache.doris.common.util.Util.decodeVarint64;
import static org.apache.doris.common.util.Util.encodeVarint64;
import static org.apache.doris.load.loadv2.BitmapValue.BITMAP32;
import static org.apache.doris.load.loadv2.BitmapValue.BITMAP64;
/**
*
@ -1316,13 +1315,13 @@ public class Roaring64Map {
return;
}
if (is32BitsEnough()) {
out.write(BITMAP32);
out.write(BitmapValue.BITMAP32);
highToBitmap.get(0).serialize(out);
return;
}
out.write(BITMAP64);
encodeVarint64(highToBitmap.size(), out);
out.write(BitmapValue.BITMAP64);
Codec.encodeVarint64(highToBitmap.size(), out);
for (Map.Entry<Integer, BitmapDataProvider> entry : highToBitmap.entrySet()) {
out.writeInt(entry.getKey().intValue());
@ -1347,8 +1346,8 @@ public class Roaring64Map {
highToBitmap = new TreeMap<>();
long nbHighs = 1;
if (bitmapType == BITMAP64) {
nbHighs = decodeVarint64(in);
if (bitmapType == BitmapValue.BITMAP64) {
nbHighs = Codec.decodeVarint64(in);
}
for (int i = 0; i < nbHighs; i++) {

View File

@ -17,8 +17,7 @@
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.common.UserException;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import com.google.common.base.Strings;
@ -73,6 +72,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.util.SerializableConfiguration;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@ -104,6 +104,11 @@ public final class SparkDpp implements java.io.Serializable {
private Map<String, Integer> bucketKeyMap = new HashMap<>();
// accumulator to collect invalid rows
private StringAccumulator invalidRows = new StringAccumulator();
// save the hadoop configuration from spark session.
// because hadoop configuration is not serializable,
// we need to wrap it so that we can use it in executor.
private SerializableConfiguration serializableHadoopConf;
public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
this.spark = spark;
@ -117,9 +122,10 @@ public final class SparkDpp implements java.io.Serializable {
fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc");
fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc");
spark.sparkContext().register(invalidRows, "InvalidRowsAccumulator");
this.serializableHadoopConf = new SerializableConfiguration(spark.sparkContext().hadoopConfiguration());
}
private Dataset<Row> processRDDAggAndRepartition(Dataset<Row> dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws UserException {
private Dataset<Row> processRDDAggAndRepartition(Dataset<Row> dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws SparkDppException {
final boolean isDuplicateTable = !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "AGGREGATE")
&& !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "UNIQUE");
@ -184,7 +190,7 @@ public final class SparkDpp implements java.io.Serializable {
private void writePartitionedAndSortedDataframeToParquet(Dataset<Row> dataframe,
String pathPattern,
long tableId,
EtlJobConfig.EtlIndex indexMeta) throws UserException {
EtlJobConfig.EtlIndex indexMeta) throws SparkDppException {
StructType outputSchema = dataframe.schema();
StructType dstSchema = DataTypes.createStructType(
Arrays.asList(outputSchema.fields()).stream()
@ -195,7 +201,7 @@ public final class SparkDpp implements java.io.Serializable {
@Override
public void call(Iterator<Row> t) throws Exception {
// write the data to dst file
Configuration conf = new Configuration();
Configuration conf = new Configuration(serializableHadoopConf.value());
FileSystem fs = FileSystem.get(URI.create(etlJobConfig.outputPath), conf);
String lastBucketKey = null;
ParquetWriter<InternalRow> parquetWriter = null;
@ -278,7 +284,7 @@ public final class SparkDpp implements java.io.Serializable {
private void processRollupTree(RollupTreeNode rootNode,
Dataset<Row> rootDataframe,
long tableId, EtlJobConfig.EtlTable tableMeta,
EtlJobConfig.EtlIndex baseIndex) throws UserException {
EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {
Queue<RollupTreeNode> nodeQueue = new LinkedList<>();
nodeQueue.offer(rootNode);
int currentLevel = 0;
@ -351,7 +357,7 @@ public final class SparkDpp implements java.io.Serializable {
List<String> valueColumnNames,
StructType dstTableSchema,
EtlJobConfig.EtlIndex baseIndex,
List<Long> validPartitionIds) throws UserException {
List<Long> validPartitionIds) throws SparkDppException {
List<String> distributeColumns = partitionInfo.distributionColumnRefs;
Partitioner partitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndex, partitionRangeKeys);
Set<Integer> validPartitionIndex = new HashSet<>();
@ -440,7 +446,7 @@ public final class SparkDpp implements java.io.Serializable {
private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex,
Dataset<Row> srcDataframe,
StructType dstTableSchema,
EtlJobConfig.EtlFileGroup fileGroup) throws UserException {
EtlJobConfig.EtlFileGroup fileGroup) throws SparkDppException {
Dataset<Row> dataframe = srcDataframe;
StructType srcSchema = dataframe.schema();
Set<String> srcColumnNames = new HashSet<>();
@ -471,7 +477,7 @@ public final class SparkDpp implements java.io.Serializable {
} else if (column.isAllowNull) {
dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
} else {
throw new UserException("Reason: no data for column:" + dstField.name());
throw new SparkDppException("Reason: no data for column:" + dstField.name());
}
}
if (column.columnType.equalsIgnoreCase("DATE")) {
@ -492,7 +498,7 @@ public final class SparkDpp implements java.io.Serializable {
// 2. process the mapping columns
for (String mappingColumn : mappingColumns) {
String mappingDescription = columnMappings.get(mappingColumn).toDescription();
if (mappingDescription.toLowerCase().contains(FunctionSet.HLL_HASH)) {
if (mappingDescription.toLowerCase().contains("hll_hash")) {
continue;
}
// here should cast data type to dst column type
@ -517,7 +523,7 @@ public final class SparkDpp implements java.io.Serializable {
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws UserException {
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
List<String> dataSrcColumns = fileGroup.fileFieldNames;
if (dataSrcColumns == null) {
@ -630,7 +636,7 @@ public final class SparkDpp implements java.io.Serializable {
// partition keys will be parsed into double from json
// so need to convert it to partition columns' type
private Object convertPartitionKey(Object srcValue, Class dstClass) throws UserException {
private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException {
if (dstClass.equals(Float.class) || dstClass.equals(Double.class)) {
return null;
}
@ -655,7 +661,7 @@ public final class SparkDpp implements java.io.Serializable {
}
} else {
LOG.warn("unsupport partition key:" + srcValue);
throw new UserException("unsupport partition key:" + srcValue);
throw new SparkDppException("unsupport partition key:" + srcValue);
}
}
@ -685,7 +691,7 @@ public final class SparkDpp implements java.io.Serializable {
}
private List<DorisRangePartitioner.PartitionRangeKey> createPartitionRangeKeys(
EtlJobConfig.EtlPartitionInfo partitionInfo, List<Class> partitionKeySchema) throws UserException {
EtlJobConfig.EtlPartitionInfo partitionInfo, List<Class> partitionKeySchema) throws SparkDppException {
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
@ -716,14 +722,13 @@ public final class SparkDpp implements java.io.Serializable {
List<String> filePaths,
EtlJobConfig.EtlFileGroup fileGroup,
StructType dstTableSchema)
throws UserException, IOException, URISyntaxException {
throws SparkDppException, IOException, URISyntaxException {
Dataset<Row> fileGroupDataframe = null;
for (String filePath : filePaths) {
fileNumberAcc.add(1);
try {
Configuration conf = new Configuration();
URI uri = new URI(filePath);
FileSystem fs = FileSystem.get(uri, conf);
FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value());
FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
fileSizeAcc.add(fileStatus.getLen());
} catch (Exception e) {
@ -732,7 +737,7 @@ public final class SparkDpp implements java.io.Serializable {
}
if (fileGroup.columnSeparator == null) {
LOG.warn("invalid null column separator!");
throw new UserException("Reason: invalid null column separator!");
throw new SparkDppException("Reason: invalid null column separator!");
}
Dataset<Row> dataframe = null;
@ -751,7 +756,7 @@ public final class SparkDpp implements java.io.Serializable {
String hiveDbTableName,
EtlJobConfig.EtlIndex baseIndex,
EtlJobConfig.EtlFileGroup fileGroup,
StructType dstTableSchema) throws UserException {
StructType dstTableSchema) throws SparkDppException {
// select base index columns from hive table
StringBuilder sql = new StringBuilder();
sql.append("select ");
@ -876,9 +881,8 @@ public final class SparkDpp implements java.io.Serializable {
DppResult dppResult = process();
String outputPath = etlJobConfig.getOutputPath();
String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
Configuration conf = new Configuration();
URI uri = new URI(outputPath);
FileSystem fs = FileSystem.get(uri, conf);
FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value());
Path filePath = new Path(resultFilePath);
FSDataOutputStream outputStream = fs.create(filePath);
Gson gson = new Gson();
@ -886,4 +890,5 @@ public final class SparkDpp implements java.io.Serializable {
outputStream.write('\n');
outputStream.close();
}
}
}

View File

@ -18,9 +18,7 @@
package org.apache.doris.load.loadv2.dpp;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.BitmapValue;
import org.apache.doris.load.loadv2.Hll;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.function.Function2;
@ -55,7 +53,7 @@ public abstract class SparkRDDAggregator<T> implements Serializable {
};
// TODO(wb) support more datatype:decimal,date,datetime
public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws UserException {
public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws SparkDppException {
String aggType = StringUtils.lowerCase(column.aggregationType);
String columnType = StringUtils.lowerCase(column.columnType);
switch (aggType) {
@ -78,7 +76,7 @@ public abstract class SparkRDDAggregator<T> implements Serializable {
case "largeint":
return new LargeIntMaxAggregator();
default:
throw new UserException(String.format("unsupported max aggregator for column type:%s", columnType));
throw new SparkDppException(String.format("unsupported max aggregator for column type:%s", columnType));
}
case "min":
switch (columnType) {
@ -95,7 +93,7 @@ public abstract class SparkRDDAggregator<T> implements Serializable {
case "largeint":
return new LargeIntMinAggregator();
default:
throw new UserException(String.format("unsupported min aggregator for column type:%s", columnType));
throw new SparkDppException(String.format("unsupported min aggregator for column type:%s", columnType));
}
case "sum":
switch (columnType) {
@ -114,14 +112,14 @@ public abstract class SparkRDDAggregator<T> implements Serializable {
case "largeint":
return new LargeIntSumAggregator();
default:
throw new UserException(String.format("unsupported sum aggregator for column type:%s", columnType));
throw new SparkDppException(String.format("unsupported sum aggregator for column type:%s", columnType));
}
case "replace_if_not_null":
return new ReplaceIfNotNullAggregator();
case "replace":
return new ReplaceAggregator();
default:
throw new UserException(String.format("unsupported aggregate type %s", aggType));
throw new SparkDppException(String.format("unsupported aggregate type %s", aggType));
}
}

View File

@ -17,10 +17,10 @@
package org.apache.doris.load.loadv2.etl;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableMap;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
@ -217,7 +217,7 @@ public class EtlJobConfig implements Serializable {
public String configToJson() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy());
gsonBuilder.addDeserializationExclusionStrategy(new HiddenAnnotationExclusionStrategy());
Gson gson = gsonBuilder.create();
return gson.toJson(this);
}
@ -599,4 +599,15 @@ public class EtlJobConfig implements Serializable {
'}';
}
}
public static class HiddenAnnotationExclusionStrategy implements ExclusionStrategy {
public boolean shouldSkipField(FieldAttributes f) {
return f.getAnnotation(SerializedName.class) == null;
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
}
}

View File

@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.load.loadv2.dpp.BitmapValue;
import org.apache.doris.common.Codec;
import org.junit.Assert;
import org.junit.Test;
@ -27,8 +30,6 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import static org.apache.doris.common.util.Util.decodeVarint64;
import static org.apache.doris.common.util.Util.encodeVarint64;
import static org.junit.Assert.assertEquals;
public class BitmapValueTest {
@ -39,8 +40,8 @@ public class BitmapValueTest {
for (long value : sourceValue) {
ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(byteArrayOutput);
encodeVarint64(value, output);
assertEquals(value, decodeVarint64(new DataInputStream(new ByteArrayInputStream(byteArrayOutput.toByteArray()))));
Codec.encodeVarint64(value, output);
assertEquals(value, Codec.decodeVarint64(new DataInputStream(new ByteArrayInputStream(byteArrayOutput.toByteArray()))));
}
}

View File

@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.load.loadv2.dpp.Hll;
import org.junit.Assert;
import org.junit.Test;
@ -27,16 +29,14 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import static org.apache.doris.load.loadv2.Hll.*;
public class HllTest {
@Test
public void testFindFirstNonZeroBitPosition() {
Assert.assertTrue(getLongTailZeroNum(0) == 0);
Assert.assertTrue(getLongTailZeroNum(1) == 0);
Assert.assertTrue(getLongTailZeroNum(1l << 30) == 30);
Assert.assertTrue(getLongTailZeroNum(1l << 62) == 62);
Assert.assertTrue(Hll.getLongTailZeroNum(0) == 0);
Assert.assertTrue(Hll.getLongTailZeroNum(1) == 0);
Assert.assertTrue(Hll.getLongTailZeroNum(1l << 30) == 30);
Assert.assertTrue(Hll.getLongTailZeroNum(1l << 62) == 62);
}
@Test
@ -44,7 +44,7 @@ public class HllTest {
// test empty
Hll emptyHll = new Hll();
Assert.assertTrue(emptyHll.getType() == HLL_DATA_EMPTY);
Assert.assertTrue(emptyHll.getType() == Hll.HLL_DATA_EMPTY);
Assert.assertTrue(emptyHll.estimateCardinality() == 0);
ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream();
@ -53,15 +53,15 @@ public class HllTest {
DataInputStream emptyInputStream = new DataInputStream(new ByteArrayInputStream(emptyOutputStream.toByteArray()));
Hll deserializedEmptyHll = new Hll();
deserializedEmptyHll.deserialize(emptyInputStream);
Assert.assertTrue(deserializedEmptyHll.getType() == HLL_DATA_EMPTY);
Assert.assertTrue(deserializedEmptyHll.getType() == Hll.HLL_DATA_EMPTY);
// test explicit
Hll explicitHll = new Hll();
for (int i = 0; i < HLL_EXPLICLIT_INT64_NUM; i++) {
for (int i = 0; i < Hll.HLL_EXPLICLIT_INT64_NUM; i++) {
explicitHll.updateWithHash(i);
}
Assert.assertTrue(explicitHll.getType() == HLL_DATA_EXPLICIT);
Assert.assertTrue(explicitHll.estimateCardinality() == HLL_EXPLICLIT_INT64_NUM);
Assert.assertTrue(explicitHll.getType() == Hll.HLL_DATA_EXPLICIT);
Assert.assertTrue(explicitHll.estimateCardinality() == Hll.HLL_EXPLICLIT_INT64_NUM);
ByteArrayOutputStream explicitOutputStream = new ByteArrayOutputStream();
DataOutput explicitOutput = new DataOutputStream(explicitOutputStream);
@ -69,17 +69,17 @@ public class HllTest {
DataInputStream explicitInputStream = new DataInputStream(new ByteArrayInputStream(explicitOutputStream.toByteArray()));
Hll deserializedExplicitHll = new Hll();
deserializedExplicitHll.deserialize(explicitInputStream);
Assert.assertTrue(deserializedExplicitHll.getType() == HLL_DATA_EXPLICIT);
Assert.assertTrue(deserializedExplicitHll.getType() == Hll.HLL_DATA_EXPLICIT);
// test sparse
Hll sparseHll = new Hll();
for (int i = 0; i < HLL_SPARSE_THRESHOLD; i++) {
for (int i = 0; i < Hll.HLL_SPARSE_THRESHOLD; i++) {
sparseHll.updateWithHash(i);
}
Assert.assertTrue(sparseHll.getType() == HLL_DATA_FULL);
Assert.assertTrue(sparseHll.getType() == Hll.HLL_DATA_FULL);
// 2% error rate
Assert.assertTrue(sparseHll.estimateCardinality() > HLL_SPARSE_THRESHOLD * (1 - 0.02) &&
sparseHll.estimateCardinality() < HLL_SPARSE_THRESHOLD * (1 + 0.02));
Assert.assertTrue(sparseHll.estimateCardinality() > Hll.HLL_SPARSE_THRESHOLD * (1 - 0.02) &&
sparseHll.estimateCardinality() < Hll.HLL_SPARSE_THRESHOLD * (1 + 0.02));
ByteArrayOutputStream sparseOutputStream = new ByteArrayOutputStream();
DataOutput sparseOutput = new DataOutputStream(sparseOutputStream);
@ -87,7 +87,7 @@ public class HllTest {
DataInputStream sparseInputStream = new DataInputStream(new ByteArrayInputStream(sparseOutputStream.toByteArray()));
Hll deserializedSparseHll = new Hll();
deserializedSparseHll.deserialize(sparseInputStream);
Assert.assertTrue(deserializedSparseHll.getType() == HLL_DATA_SPARSE);
Assert.assertTrue(deserializedSparseHll.getType() == Hll.HLL_DATA_SPARSE);
Assert.assertTrue(sparseHll.estimateCardinality() == deserializedSparseHll.estimateCardinality());
@ -96,7 +96,7 @@ public class HllTest {
for (int i = 1; i <= Short.MAX_VALUE; i++) {
fullHll.updateWithHash(i);
}
Assert.assertTrue(fullHll.getType() == HLL_DATA_FULL);
Assert.assertTrue(fullHll.getType() == Hll.HLL_DATA_FULL);
// the result 32748 is consistent with C++ 's implementation
Assert.assertTrue(fullHll.estimateCardinality() == 32748);
Assert.assertTrue(fullHll.estimateCardinality() > Short.MAX_VALUE * (1 - 0.02) &&
@ -108,7 +108,7 @@ public class HllTest {
DataInputStream fullHllInputStream = new DataInputStream(new ByteArrayInputStream(fullHllOutputStream.toByteArray()));
Hll deserializedFullHll = new Hll();
deserializedFullHll.deserialize(fullHllInputStream);
Assert.assertTrue(deserializedFullHll.getType() == HLL_DATA_FULL);
Assert.assertTrue(deserializedFullHll.getType() == Hll.HLL_DATA_FULL);
Assert.assertTrue(deserializedFullHll.estimateCardinality() == fullHll.estimateCardinality());
}
@ -158,11 +158,11 @@ public class HllTest {
long preValue = sparseHll.estimateCardinality();
// check serialize
byte[] serializedHll = serializeHll(sparseHll);
Assert.assertTrue(serializedHll.length < HLL_REGISTERS_COUNT + 1);
Assert.assertTrue(serializedHll.length < Hll.HLL_REGISTERS_COUNT + 1);
sparseHll = deserializeHll(serializedHll);
Assert.assertTrue(sparseHll.estimateCardinality() == preValue);
Assert.assertTrue(sparseHll.getType() == HLL_DATA_SPARSE);
Assert.assertTrue(sparseHll.getType() == Hll.HLL_DATA_SPARSE);
Hll otherHll = new Hll();
for (int i = 0; i < 1024; i++) {
@ -190,7 +190,7 @@ public class HllTest {
byte[] serializedHll = serializeHll(fullHll);
fullHll = deserializeHll(serializedHll);
Assert.assertTrue(fullHll.estimateCardinality() == preValue);
Assert.assertTrue(serializedHll.length == HLL_REGISTERS_COUNT + 1);
Assert.assertTrue(serializedHll.length == Hll.HLL_REGISTERS_COUNT + 1);
// 2% error rate
Assert.assertTrue(preValue > 62 * 1024 && preValue < 66 * 1024);

View File

@ -75,9 +75,6 @@ fi
echo "Build Frontend UT"
rm ${DORIS_HOME}/fe/build/ -rf
rm ${DORIS_HOME}/fe/output/ -rf
echo "******************************"
echo " Runing DorisFe Unittest "
echo "******************************"