[Spark Load]Add spark etl job main class (#3927)

1. Add SparkEtlJob class
2. Remove DppResult comment
3. Support loading from hive table directly

#3433
This commit is contained in:
wyb
2020-06-24 13:54:55 +08:00
committed by GitHub
parent 93a0b47d22
commit 3f7307d685
33 changed files with 556 additions and 125 deletions

View File

@ -944,9 +944,9 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
return OLAP_ERR_PUSH_INIT_ERROR;
}
_runtime_profile.reset(_runtime_state->runtime_profile());
_runtime_profile = _runtime_state->runtime_profile();
_runtime_profile->set_name("PushBrokerReader");
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker()));
_mem_tracker.reset(new MemTracker(_runtime_profile, -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker()));
_mem_pool.reset(new MemPool(_mem_tracker.get()));
_counter.reset(new ScannerCounter());
@ -955,7 +955,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
switch (t_scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new ParquetScanner(_runtime_state.get(),
_runtime_profile.get(),
_runtime_profile,
t_scan_range.params,
t_scan_range.ranges,
t_scan_range.broker_addresses,

View File

@ -247,7 +247,7 @@ private:
Tuple* _tuple;
const Schema* _schema;
std::unique_ptr<RuntimeState> _runtime_state;
std::unique_ptr<RuntimeProfile> _runtime_profile;
RuntimeProfile* _runtime_profile;
std::unique_ptr<MemTracker> _mem_tracker;
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<ScannerCounter> _counter;

View File

@ -94,9 +94,6 @@ public class CreateTableStmt extends DdlStmt {
// for backup. set to -1 for normal use
private int tableSignature;
// TODO(wyb): spark-load
private static boolean disableHiveTable = true;
public CreateTableStmt() {
// for persist
tableName = new TableName();
@ -259,7 +256,7 @@ public class CreateTableStmt extends DdlStmt {
analyzeEngineName();
// TODO(wyb): spark-load
if (engineName.equals("hive") && disableHiveTable) {
if (engineName.equals("hive") && !Config.enable_spark_load) {
throw new AnalysisException("Spark Load from hive table is comming soon");
}
// analyze key desc

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
@ -46,9 +47,6 @@ public class GrantStmt extends DdlStmt {
private ResourcePattern resourcePattern;
private List<PaloPrivilege> privileges;
// TODO(wyb): spark-load
public static boolean disableGrantResource = true;
public GrantStmt(UserIdentity userIdent, String role, TablePattern tblPattern, List<AccessPrivilege> privileges) {
this.userIdent = userIdent;
this.role = role;
@ -111,7 +109,7 @@ public class GrantStmt extends DdlStmt {
tblPattern.analyze(analyzer.getClusterName());
} else {
// TODO(wyb): spark-load
if (disableGrantResource) {
if (!Config.enable_spark_load) {
throw new AnalysisException("GRANT ON RESOURCE is comming soon");
}
resourcePattern.analyze();

View File

@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
@ -100,9 +101,6 @@ public class LoadStmt extends DdlStmt {
private String version = "v2";
// TODO(wyb): spark-load
public static boolean disableSparkLoad = true;
// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(TIMEOUT_PROPERTY)
@ -288,7 +286,7 @@ public class LoadStmt extends DdlStmt {
resourceDesc.analyze();
etlJobType = resourceDesc.getEtlJobType();
// TODO(wyb): spark-load
if (disableSparkLoad) {
if (!Config.enable_spark_load) {
throw new AnalysisException("Spark Load is comming soon");
}
// check resource usage privilege

View File

@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.mysql.privilege.PaloPrivilege;
import org.apache.doris.mysql.privilege.PrivBitSet;
@ -98,7 +99,7 @@ public class RevokeStmt extends DdlStmt {
tblPattern.analyze(analyzer.getClusterName());
} else {
// TODO(wyb): spark-load
if (GrantStmt.disableGrantResource) {
if (!Config.enable_spark_load) {
throw new AnalysisException("REVOKE ON RESOURCE is comming soon");
}
resourcePattern.analyze();

View File

@ -1460,6 +1460,7 @@ public class Catalog {
checksum = loadGlobalVariable(dis, checksum);
checksum = loadCluster(dis, checksum);
checksum = loadBrokers(dis, checksum);
checksum = loadResources(dis, checksum);
checksum = loadExportJob(dis, checksum);
checksum = loadBackupHandler(dis, checksum);
checksum = loadPaloAuth(dis, checksum);
@ -1468,8 +1469,6 @@ public class Catalog {
checksum = loadColocateTableIndex(dis, checksum);
checksum = loadRoutineLoadJobs(dis, checksum);
checksum = loadLoadJobsV2(dis, checksum);
// TODO(wyb): spark-load
//checksum = loadResources(dis, checksum);
checksum = loadSmallFiles(dis, checksum);
checksum = loadPlugins(dis, checksum);
checksum = loadDeleteHandler(dis, checksum);
@ -1872,13 +1871,10 @@ public class Catalog {
}
public long loadResources(DataInputStream in, long checksum) throws IOException {
// TODO(wyb): spark-load
/*
if (MetaContext.get().getMetaVersion() >= FeMetaVersion.new_version_by_wyb) {
resourceMgr = ResourceMgr.read(in);
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) {
resourceMgr = ResourceMgr.read(in);
}
LOG.info("finished replay resources from image");
*/
return checksum;
}
@ -1927,6 +1923,7 @@ public class Catalog {
checksum = saveGlobalVariable(dos, checksum);
checksum = saveCluster(dos, checksum);
checksum = saveBrokers(dos, checksum);
checksum = saveResources(dos, checksum);
checksum = saveExportJob(dos, checksum);
checksum = saveBackupHandler(dos, checksum);
checksum = savePaloAuth(dos, checksum);
@ -1934,8 +1931,6 @@ public class Catalog {
checksum = saveColocateTableIndex(dos, checksum);
checksum = saveRoutineLoadJobs(dos, checksum);
checksum = saveLoadJobsV2(dos, checksum);
// TODO(wyb): spark-load
//checksum = saveResources(dos, checksum);
checksum = saveSmallFiles(dos, checksum);
checksum = savePlugins(dos, checksum);
checksum = saveDeleteHandler(dos, checksum);

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@ -88,12 +89,12 @@ public class ResourceMgr implements Writable {
}
// log drop
Catalog.getCurrentCatalog().getEditLog().logDropResource(name);
Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(name));
LOG.info("drop resource success. resource name: {}", name);
}
public void replayDropResource(String name) {
nameToResource.remove(name);
public void replayDropResource(DropResourceOperationLog operationLog) {
nameToResource.remove(operationLog.getName());
}
public boolean containsResource(String name) {

View File

@ -1088,5 +1088,11 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean drop_backend_after_decommission = true;
/*
* enable spark load for temporary use
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_spark_load = false;
}

View File

@ -183,6 +183,8 @@ public final class FeMetaVersion {
public static final int VERSION_85 = 85;
// serialize origStmt in rollupJob and mv meta
public static final int VERSION_86 = 86;
// spark resource, resource privilege, broker file group for hive table
public static final int VERSION_87 = 87;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_86;
public static final int VERSION_CURRENT = VERSION_87;
}

View File

@ -58,6 +58,7 @@ import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@ -511,8 +512,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_DROP_RESOURCE: {
data = new Text();
((Text) data).readFields(in);
data = DropResourceOperationLog.read(in);
isRead = true;
break;
}

View File

@ -358,11 +358,8 @@ public class BrokerFileGroup implements Writable {
}
// src table
// TODO(wyb): spark-load
/*
out.writeLong(srcTableId);
out.writeBoolean(isLoadFromTable);
*/
}
public void readFields(DataInput in) throws IOException {
@ -414,13 +411,10 @@ public class BrokerFileGroup implements Writable {
}
}
// src table
// TODO(wyb): spark-load
/*
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version) {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) {
srcTableId = in.readLong();
isLoadFromTable = in.readBoolean();
}
*/
// There are no columnExprList in the previous load job which is created before function is supported.
// The columnExprList could not be analyzed without origin stmt in the previous load job.

View File

@ -20,7 +20,7 @@ package org.apache.doris.load;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
//import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.thrift.TEtlState;
import com.google.common.base.Strings;
@ -46,7 +46,7 @@ public class EtlStatus implements Writable {
// 0 - 100
private int progress;
private String failMsg;
//private DppResult dppResult;
private DppResult dppResult;
public EtlStatus() {
this.state = TEtlState.RUNNING;
@ -56,7 +56,7 @@ public class EtlStatus implements Writable {
this.fileMap = Maps.newHashMap();
this.progress = 0;
this.failMsg = "";
//this.dppResult = null;
this.dppResult = null;
}
public TEtlState getState() {
@ -128,8 +128,6 @@ public class EtlStatus implements Writable {
this.failMsg = failMsg;
}
// TODO(wyb): spark-load
/*
public DppResult getDppResult() {
return dppResult;
}
@ -137,7 +135,6 @@ public class EtlStatus implements Writable {
public void setDppResult(DppResult dppResult) {
this.dppResult = dppResult;
}
*/
public void reset() {
this.stats.clear();
@ -145,7 +142,7 @@ public class EtlStatus implements Writable {
this.fileMap.clear();
this.progress = 0;
this.failMsg = "";
//this.dppResult = null;
this.dppResult = null;
}
@Override
@ -158,7 +155,7 @@ public class EtlStatus implements Writable {
", fileMap=" + fileMap +
", progress=" + progress +
", failMsg='" + failMsg + '\'' +
//", dppResult='" + dppResult + '\'' +
", dppResult='" + dppResult + '\'' +
'}';
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
@ -248,7 +249,7 @@ public class BrokerLoadJob extends BulkLoadJob {
// check data quality
if (!checkDataQuality()) {
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
true, true);
return;
}

View File

@ -82,7 +82,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
private static final Logger LOG = LogManager.getLogger(LoadJob.class);
protected static final String QUALITY_FAIL_MSG = "quality not good enough to cancel";
protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL";
protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
public static final String UNSELECTED_ROWS = "unselected.rows";

View File

@ -17,10 +17,6 @@
package org.apache.doris.load.loadv2;
import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
@ -159,7 +155,7 @@ public class LoadManager implements Writable{
return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), false,
loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
false /* no need to write edit log, because createLoadJob log is not wrote yet */);
}
throw e;
@ -403,11 +399,11 @@ public class LoadManager implements Writable{
((SparkLoadJob) job).updateEtlStatus();
} catch (DataQualityException e) {
LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
true, true);
} catch (UserException e) {
LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(ETL_RUN_FAIL, e.getMessage()), true, true);
job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
} catch (Exception e) {
LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
}
@ -422,7 +418,7 @@ public class LoadManager implements Writable{
((SparkLoadJob) job).updateLoadingStatus();
} catch (UserException e) {
LOG.warn("update load job loading status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), true, true);
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
} catch (Exception e) {
LOG.warn("update load job loading status failed. job id: {}", job.getId(), e);
}

View File

@ -25,8 +25,9 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.load.EtlStatus;
//import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.load.loadv2.etl.SparkEtlJob;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;
@ -46,12 +47,12 @@ import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.apache.spark.launcher.SparkLauncher;
//import com.google.common.base.Strings;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
//import com.google.gson.Gson;
//import com.google.gson.JsonSyntaxException;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@ -73,7 +74,6 @@ public class SparkEtlJobHandler {
private static final String CONFIG_FILE_NAME = "jobconfig.json";
private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME;
private static final String JOB_CONFIG_DIR = "configs";
private static final String MAIN_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob";
private static final String ETL_JOB_NAME = "doris__%s";
// 5min
private static final int GET_APPID_MAX_RETRY_TIMES = 300;
@ -112,10 +112,7 @@ public class SparkEtlJobHandler {
launcher.setMaster(resource.getMaster())
.setDeployMode(resource.getDeployMode().name().toLowerCase())
.setAppResource(appResourceHdfsPath)
// TODO(wyb): spark-load
// replace with getCanonicalName later
//.setMainClass(SparkEtlJob.class.getCanonicalName())
.setMainClass(MAIN_CLASS)
.setMainClass(SparkEtlJob.class.getCanonicalName())
.setAppName(String.format(ETL_JOB_NAME, loadLabel))
.addAppArgs(jobConfigHdfsPath);
// spark configs
@ -220,8 +217,6 @@ public class SparkEtlJobHandler {
if (status.getState() == TEtlState.FINISHED || status.getState() == TEtlState.CANCELLED) {
// get dpp result
// TODO(wyb): spark-load
/*
String dppResultFilePath = EtlJobConfig.getDppResultFilePath(etlOutputPath);
try {
byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc);
@ -234,7 +229,6 @@ public class SparkEtlJobHandler {
} catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) {
LOG.warn("read broker file failed. path: {}", dppResultFilePath, e);
}
*/
}
return status;

View File

@ -17,8 +17,6 @@
package org.apache.doris.load.loadv2;
import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DescriptorTable;
@ -58,7 +56,7 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
//import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.FrontendOptions;
@ -76,7 +74,7 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
//import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TabletQuorumFailedException;
@ -313,8 +311,6 @@ public class SparkLoadJob extends BulkLoadJob {
loadingStatus.setTrackingUrl(appId);
}
// TODO(wyb): spark-load
/*
DppResult dppResult = etlStatus.getDppResult();
if (dppResult != null) {
// update load statistic and counters when spark etl job finished
@ -331,14 +327,13 @@ public class SparkLoadJob extends BulkLoadJob {
counters.put(DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows));
counters.put(UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows));
}
*/
}
private void unprotectedProcessEtlFinish(EtlStatus etlStatus, SparkEtlJobHandler handler) throws Exception {
unprotectedUpdateEtlStatusInternal(etlStatus);
// checkDataQuality
if (!checkDataQuality()) {
throw new DataQualityException(QUALITY_FAIL_MSG);
throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
}
// get etl output files and update loading state

View File

@ -20,10 +20,14 @@ package org.apache.doris.load.loadv2.dpp;
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.serializer.KryoRegistrator;
/**
* register etl classes with Kryo when using Kryo serialization.
*/
public class DorisKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
kryo.register(org.apache.doris.load.loadv2.BitmapValue.class);
}
}

View File

@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.Column;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
@ -200,7 +200,7 @@ public class GlobalDictBuilder {
maxDictValue = (long)row.get(0);
minDictValue = (long)row.get(1);
}
LOG.info(" column {} 's max value in dict is {} , min value is {}", distinctColumnNameTmp, maxDictValue, minDictValue);
LOG.info(" column " + distinctColumnNameTmp + " 's max value in dict is " + maxDictValue + ", min value is " + minDictValue);
// maybe never happened, but we need detect it
if (minDictValue < 0) {
throw new RuntimeException(String.format(" column %s 's cardinality has exceed bigint's max value", distinctColumnNameTmp));

View File

@ -746,11 +746,18 @@ public final class SparkDpp implements java.io.Serializable {
}
private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
String hiveTableName,
String hiveDbTableName,
EtlJobConfig.EtlIndex baseIndex,
EtlJobConfig.EtlFileGroup fileGroup,
StructType dstTableSchema) throws UserException {
Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName);
// select base index columns from hive table
StringBuilder sql = new StringBuilder();
sql.append("select ");
baseIndex.columns.forEach(column -> {
sql.append(column.columnName).append(",");
});
sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName);
Dataset<Row> dataframe = spark.sql(sql.toString());
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
return dataframe;
}
@ -805,13 +812,13 @@ public final class SparkDpp implements java.io.Serializable {
for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
List<String> filePaths = fileGroup.filePaths;
Dataset<Row> fileGroupDataframe = null;
if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) {
EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
if (sourceType == EtlJobConfig.SourceType.FILE) {
fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
} else if (sourceType == EtlJobConfig.SourceType.HIVE) {
fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
} else {
String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME,
tableId, taskId);
fileGroupDataframe = loadDataFromHiveTable(spark, dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema);
throw new RuntimeException("Unknown source type: " + sourceType.name());
}
if (fileGroupDataframe == null) {
LOG.info("no data for file file group:" + fileGroup);

View File

@ -486,6 +486,10 @@ public class EtlJobConfig implements Serializable {
@SerializedName(value = "hiveTableProperties")
public Map<String, String> hiveTableProperties;
// hive db table used in dpp, not serialized
// set with hiveDbTableName (no bitmap column) or IntermediateHiveTable (created by global dict builder) in spark etl job
public String dppHiveDbTableName;
// for data infile path
public EtlFileGroup(SourceType sourceType, List<String> filePaths, List<String> fileFieldNames,
List<String> columnsFromPath, String columnSeparator, String lineDelimiter,

View File

@ -0,0 +1,250 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2.etl;
import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
import org.apache.doris.load.loadv2.dpp.SparkDpp;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* SparkEtlJob is responsible for global dict building, data partition, data sort and data aggregation.
* 1. init job config
* 2. check if job has bitmap_dict function columns
* 3. build global dict if step 2 is true
* 4. dpp (data partition, data sort and data aggregation)
*/
public class SparkEtlJob {
private static final Logger LOG = LogManager.getLogger(SparkEtlJob.class);
private static final String BITMAP_DICT_FUNC = "bitmap_dict";
private static final String TO_BITMAP_FUNC = "to_bitmap";
private String jobConfigFilePath;
private EtlJobConfig etlJobConfig;
private Set<Long> hiveSourceTables;
private Map<Long, Set<String>> tableToBitmapDictColumns;
private SparkSession spark;
private SparkEtlJob(String jobConfigFilePath) {
this.jobConfigFilePath = jobConfigFilePath;
this.etlJobConfig = null;
this.hiveSourceTables = Sets.newHashSet();
this.tableToBitmapDictColumns = Maps.newHashMap();
}
private void initSparkEnvironment() {
SparkConf conf = new SparkConf();
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "false");
spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();
}
private void initSparkConfigs(Map<String, String> configs) {
if (configs == null) {
return;
}
for (Map.Entry<String, String> entry : configs.entrySet()) {
spark.sparkContext().conf().set(entry.getKey(), entry.getValue());
}
}
private void initConfig() {
LOG.info("job config file path: " + jobConfigFilePath);
Dataset<String> ds = spark.read().textFile(jobConfigFilePath);
String jsonConfig = ds.first();
LOG.info("rdd read json config: " + jsonConfig);
etlJobConfig = EtlJobConfig.configFromJson(jsonConfig);
LOG.info("etl job config: " + etlJobConfig);
}
/*
* 1. check bitmap column
* 2. fill tableToBitmapDictColumns
* 3. remove bitmap_dict and to_bitmap mapping from columnMappings
*/
private void checkConfig() throws Exception {
for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet()) {
boolean isHiveSource = false;
Set<String> bitmapDictColumns = Sets.newHashSet();
for (EtlFileGroup fileGroup : entry.getValue().fileGroups) {
if (fileGroup.sourceType == EtlJobConfig.SourceType.HIVE) {
isHiveSource = true;
}
Map<String, EtlColumnMapping> newColumnMappings = Maps.newHashMap();
for (Map.Entry<String, EtlColumnMapping> mappingEntry : fileGroup.columnMappings.entrySet()) {
String columnName = mappingEntry.getKey();
String exprStr = mappingEntry.getValue().toDescription();
String funcName = functions.expr(exprStr).expr().prettyName();
if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
bitmapDictColumns.add(columnName);
} else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue());
}
}
// reset new columnMappings
fileGroup.columnMappings = newColumnMappings;
}
if (isHiveSource) {
hiveSourceTables.add(entry.getKey());
}
if (!bitmapDictColumns.isEmpty()) {
tableToBitmapDictColumns.put(entry.getKey(), bitmapDictColumns);
}
}
LOG.info("init hiveSourceTables: " + hiveSourceTables + ", tableToBitmapDictColumns: " + tableToBitmapDictColumns);
// spark etl must have only one table with bitmap type column to process.
if (hiveSourceTables.size() > 1 || tableToBitmapDictColumns.size() > 1) {
throw new Exception("spark etl job must have only one hive table with bitmap type column to process");
}
}
private void processDpp() throws Exception {
SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
sparkDpp.init();
sparkDpp.doDpp();
}
private String buildGlobalDictAndEncodeSourceTable(EtlTable table, long tableId) {
// dict column map
MultiValueMap dictColumnMap = new MultiValueMap();
for (String dictColumn : tableToBitmapDictColumns.get(tableId)) {
dictColumnMap.put(dictColumn, null);
}
// doris schema
List<String> dorisOlapTableColumnList = Lists.newArrayList();
for (EtlIndex etlIndex : table.indexes) {
if (etlIndex.isBaseIndex) {
for (EtlColumn column : etlIndex.columns) {
dorisOlapTableColumnList.add(column.columnName);
}
}
}
// hive db and tables
EtlFileGroup fileGroup = table.fileGroups.get(0);
String sourceHiveDBTableName = fileGroup.hiveDbTableName;
String dorisHiveDB = sourceHiveDBTableName.split("\\.")[0];
String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
String globalDictTableName = String.format(EtlJobConfig.GLOBAL_DICT_TABLE_NAME, tableId);
String distinctKeyTableName = String.format(EtlJobConfig.DISTINCT_KEY_TABLE_NAME, tableId, taskId);
String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, tableId, taskId);
String sourceHiveFilter = fileGroup.where;
// others
List<String> mapSideJoinColumns = Lists.newArrayList();
int buildConcurrency = 1;
List<String> veryHighCardinalityColumn = Lists.newArrayList();
int veryHighCardinalityColumnSplitNum = 1;
LOG.info("global dict builder args, dictColumnMap: " + dictColumnMap
+ ", dorisOlapTableColumnList: " + dorisOlapTableColumnList
+ ", sourceHiveDBTableName: " + sourceHiveDBTableName
+ ", sourceHiveFilter: "+ sourceHiveFilter
+ ", distinctKeyTableName: " + distinctKeyTableName
+ ", globalDictTableName: " + globalDictTableName
+ ", dorisIntermediateHiveTable: " + dorisIntermediateHiveTable);
try {
GlobalDictBuilder globalDictBuilder = new GlobalDictBuilder(
dictColumnMap, dorisOlapTableColumnList, mapSideJoinColumns, sourceHiveDBTableName,
sourceHiveFilter, dorisHiveDB, distinctKeyTableName, globalDictTableName, dorisIntermediateHiveTable,
buildConcurrency, veryHighCardinalityColumn, veryHighCardinalityColumnSplitNum, spark);
globalDictBuilder.createHiveIntermediateTable();
globalDictBuilder.extractDistinctColumn();
globalDictBuilder.buildGlobalDict();
globalDictBuilder.encodeDorisIntermediateHiveTable();
} catch (Exception e) {
throw new RuntimeException(e);
}
return String.format("%s.%s", dorisHiveDB, dorisIntermediateHiveTable);
}
private void processData() throws Exception {
if (!hiveSourceTables.isEmpty()) {
// only one table
long tableId = -1;
EtlTable table = null;
for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet()) {
tableId = entry.getKey();
table = entry.getValue();
break;
}
// init hive configs like metastore service
EtlFileGroup fileGroup = table.fileGroups.get(0);
initSparkConfigs(fileGroup.hiveTableProperties);
fileGroup.dppHiveDbTableName = fileGroup.hiveDbTableName;
// build global dict and encode source hive table if has bitmap dict columns
if (!tableToBitmapDictColumns.isEmpty() && tableToBitmapDictColumns.containsKey(tableId)) {
String dorisIntermediateHiveDbTableName = buildGlobalDictAndEncodeSourceTable(table, tableId);
// set with dorisIntermediateHiveDbTable
fileGroup.dppHiveDbTableName = dorisIntermediateHiveDbTableName;
}
}
// data partition sort and aggregation
processDpp();
}
private void run() throws Exception {
initSparkEnvironment();
initConfig();
checkConfig();
processData();
}
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("missing job config file path arg");
System.exit(-1);
}
try {
new SparkEtlJob(args[0]).run();
} catch (Exception e) {
System.err.println("spark etl job run failed");
e.printStackTrace();
System.exit(-1);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.DppConfig;
@ -1312,8 +1313,7 @@ public class PaloAuth implements Writable {
userPrivTable.write(out);
dbPrivTable.write(out);
tablePrivTable.write(out);
// TODO(wyb): spark-load
//resourcePrivTable.write(out);
resourcePrivTable.write(out);
propertyMgr.write(out);
}
@ -1322,12 +1322,9 @@ public class PaloAuth implements Writable {
userPrivTable = (UserPrivTable) PrivTable.read(in);
dbPrivTable = (DbPrivTable) PrivTable.read(in);
tablePrivTable = (TablePrivTable) PrivTable.read(in);
// TODO(wyb): spark-load
/*
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) {
resourcePrivTable = (ResourcePrivTable) PrivTable.read(in);
}
*/
propertyMgr = UserPropertyMgr.read(in);
if (userPrivTable.isEmpty()) {

View File

@ -20,6 +20,8 @@ package org.apache.doris.mysql.privilege;
import org.apache.doris.analysis.ResourcePattern;
import org.apache.doris.analysis.TablePattern;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@ -142,14 +144,11 @@ public class PaloRole implements Writable {
entry.getKey().write(out);
entry.getValue().write(out);
}
// TODO(wyb): spark-load
/*
out.writeInt(resourcePatternToPrivs.size());
for (Map.Entry<ResourcePattern, PrivBitSet> entry : resourcePatternToPrivs.entrySet()) {
entry.getKey().write(out);
entry.getValue().write(out);
}
*/
out.writeInt(users.size());
for (UserIdentity userIdentity : users) {
userIdentity.write(out);
@ -164,9 +163,7 @@ public class PaloRole implements Writable {
PrivBitSet privs = PrivBitSet.read(in);
tblPatternToPrivs.put(tblPattern, privs);
}
// TODO(wyb): spark-load
/*
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) {
size = in.readInt();
for (int i = 0; i < size; i++) {
ResourcePattern resourcePattern = ResourcePattern.read(in);
@ -174,7 +171,6 @@ public class PaloRole implements Writable {
resourcePatternToPrivs.put(resourcePattern, privs);
}
}
*/
size = in.readInt();
for (int i = 0; i < size; i++) {
UserIdentity userIdentity = UserIdentity.read(in);

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* For resource drop
*/
public class DropResourceOperationLog implements Writable {
@SerializedName(value = "name")
private String name;
public DropResourceOperationLog(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static DropResourceOperationLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), DropResourceOperationLog.class);
}
}

View File

@ -696,8 +696,8 @@ public class EditLog {
break;
}
case OperationType.OP_DROP_RESOURCE: {
final String resourceName = journal.getData().toString();
catalog.getResourceMgr().replayDropResource(resourceName);
final DropResourceOperationLog operationLog = (DropResourceOperationLog) journal.getData();
catalog.getResourceMgr().replayDropResource(operationLog);
break;
}
case OperationType.OP_CREATE_SMALL_FILE: {
@ -1277,13 +1277,11 @@ public class EditLog {
}
public void logCreateResource(Resource resource) {
// TODO(wyb): spark-load
//logEdit(OperationType.OP_CREATE_RESOURCE, resource);
logEdit(OperationType.OP_CREATE_RESOURCE, resource);
}
public void logDropResource(String resourceName) {
// TODO(wyb): spark-load
//logEdit(OperationType.OP_DROP_RESOURCE, new Text(resourceName));
public void logDropResource(DropResourceOperationLog operationLog) {
logEdit(OperationType.OP_DROP_RESOURCE, operationLog);
}
public void logCreateSmallFile(SmallFile info) {

View File

@ -20,6 +20,8 @@ package org.apache.doris.persist;
import org.apache.doris.analysis.ResourcePattern;
import org.apache.doris.analysis.TablePattern;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PrivBitSet;
@ -117,15 +119,12 @@ public class PrivInfo implements Writable {
out.writeBoolean(false);
}
// TODO(wyb): spark-load
/*
if (resourcePattern != null) {
out.writeBoolean(true);
resourcePattern.write(out);
} else {
out.writeBoolean(false);
}
*/
if (privs != null) {
out.writeBoolean(true);
@ -159,14 +158,11 @@ public class PrivInfo implements Writable {
tblPattern = TablePattern.read(in);
}
// TODO(wyb): spark-load
/*
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) {
if (in.readBoolean()) {
resourcePattern = ResourcePattern.read(in);
}
}
*/
if (in.readBoolean()) {
privs = PrivBitSet.read(in);

View File

@ -21,6 +21,7 @@ import mockit.Expectations;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.qe.ConnectContext;
@ -97,7 +98,7 @@ public class GrantStmtTest {
@Test
public void testResourceNormal() throws UserException {
// TODO(wyb): spark-load
GrantStmt.disableGrantResource = false;
Config.enable_spark_load = true;
String resourceName = "spark0";
List<AccessPrivilege> privileges = Lists.newArrayList(AccessPrivilege.USAGE_PRIV);

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ResourceMgr;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.privilege.PaloAuth;
@ -107,7 +108,7 @@ public class LoadStmtTest {
// test ResourceDesc
// TODO(wyb): spark-load
LoadStmt.disableSparkLoad = false;
Config.enable_spark_load = true;
stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList,
new ResourceDesc(resourceName, null), null);
stmt.analyze(analyzer);

View File

@ -157,9 +157,8 @@ public class SparkEtlJobHandlerTest {
result = trackingUrl;
report.getProgress();
returns(0.5f, 1f, 1f);
// TODO(wyb): spark-load
//BrokerUtil.readFile(anyString, (BrokerDesc) any);
//result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
BrokerUtil.readFile(anyString, (BrokerDesc) any);
result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
}
};
@ -180,17 +179,15 @@ public class SparkEtlJobHandlerTest {
status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.CANCELLED, status.getState());
Assert.assertEquals(100, status.getProgress());
// TODO(wyb): spark-load
//Assert.assertEquals("etl job failed", status.getDppResult().failedReason);
Assert.assertEquals("etl job failed", status.getDppResult().failedReason);
// finished
status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.FINISHED, status.getState());
Assert.assertEquals(100, status.getProgress());
Assert.assertEquals(trackingUrl, status.getTrackingUrl());
// TODO(wyb): spark-load
//Assert.assertEquals(10, status.getDppResult().normalRows);
//Assert.assertEquals(0, status.getDppResult().abnormalRows);
Assert.assertEquals(10, status.getDppResult().normalRows);
Assert.assertEquals(0, status.getDppResult().abnormalRows);
}
@Test

View File

@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2.etl;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SparkEtlJobTest {
private long tableId;
private long index1Id;
private long index2Id;
private long partition1Id;
private long partition2Id;
private EtlJobConfig etlJobConfig;
@Before
public void setUp() {
tableId = 0L;
index1Id = 1L;
index2Id = 2L;
partition1Id = 3L;
partition2Id = 4L;
// indexes
EtlColumn k1 = new EtlColumn("k1", "INT", false, true, "NONE", "0", 0, 0, 0);
EtlColumn k2 = new EtlColumn("k2", "VARCHAR", false, true, "NONE", "0", 10, 0, 0);
EtlColumn v1 = new EtlColumn("v1", "BIGINT", false, false, "NONE", "0", 0, 0, 0);
EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2, v1), 666666, "DUPLICATE", true);
v1 = new EtlColumn("v1", "BIGINT", false, false, "SUM", "0", 0, 0, 0);
EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1), 888888, "AGGREGATE", true);
List<EtlIndex> indexes = Lists.newArrayList(index1, index2);
// partition info
List<EtlPartition> partitions = Lists.newArrayList();
partitions.add(new EtlPartition(partition1Id, Lists.newArrayList(0), Lists.newArrayList(100), false, 2));
partitions.add(new EtlPartition(partition2Id, Lists.newArrayList(100), Lists.newArrayList(), true, 3));
EtlPartitionInfo partitionInfo = new EtlPartitionInfo("RANGE", Lists.newArrayList("k1"), Lists.newArrayList("k2"), partitions);
EtlTable table = new EtlTable(indexes, partitionInfo);
// file group
Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap();
columnMappings.put("k1", new EtlColumnMapping("k1 + 1"));
table.addFileGroup(new EtlFileGroup(EtlJobConfig.SourceType.FILE, Lists.newArrayList("hdfs://127.0.0.1:10000/file"),
Lists.newArrayList(), Lists.newArrayList(), "\t", "\n", false, null,
Maps.newHashMap(), "", Lists.newArrayList(partition1Id, partition2Id)));
// tables
Map<Long, EtlTable> tables = Maps.newHashMap();
tables.put(tableId, table);
// others
String outputFilePattern = "V1.label0.%d.%d.%d.%d.%d.parquet";
String label = "label0";
EtlJobProperty properties = new EtlJobProperty();
properties.strictMode = false;
properties.timezone = "Asia/Shanghai";
etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label, properties);
}
@Test
public void testInitConfig(@Mocked SparkSession spark, @Injectable Dataset<String> ds) {
new Expectations() {
{
SparkSession.builder().enableHiveSupport().getOrCreate();
result = spark;
spark.read().textFile(anyString);
result = ds;
ds.first();
result = etlJobConfig.configToJson();
}
};
SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json");
Deencapsulation.invoke(job, "initSparkEnvironment");
Deencapsulation.invoke(job, "initConfig");
EtlJobConfig parsedConfig = Deencapsulation.getField(job, "etlJobConfig");
Assert.assertTrue(parsedConfig.tables.containsKey(tableId));
EtlTable table = parsedConfig.tables.get(tableId);
Assert.assertEquals(2, table.indexes.size());
Assert.assertEquals(2, table.partitionInfo.partitions.size());
Assert.assertEquals(false, parsedConfig.properties.strictMode);
Assert.assertEquals("label0", parsedConfig.label);
}
@Test
public void testCheckConfigWithoutBitmapDictColumns() {
SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json");
Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
Deencapsulation.invoke(job, "checkConfig");
Map<Long, Set<String>> tableToBitmapDictColumns = Deencapsulation.getField(job, "tableToBitmapDictColumns");
// check bitmap dict columns empty
Assert.assertTrue(tableToBitmapDictColumns.isEmpty());
}
@Test
public void testCheckConfigWithBitmapDictColumns() {
SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json");
EtlTable table = etlJobConfig.tables.get(tableId);
table.indexes.get(0).columns.add(
new EtlColumn("v2", "BITMAP", false, false, "BITMAP_UNION", "0", 0, 0, 0)
);
EtlFileGroup fileGroup = table.fileGroups.get(0);
fileGroup.sourceType = EtlJobConfig.SourceType.HIVE;
fileGroup.columnMappings.put(
"v2", new EtlColumnMapping("bitmap_dict", Lists.newArrayList("v2"))
);
Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
Deencapsulation.invoke(job, "checkConfig");
// check hive source
Set<Long> hiveSourceTables = Deencapsulation.getField(job, "hiveSourceTables");
Assert.assertTrue(hiveSourceTables.contains(tableId));
// check bitmap dict columns has v2
Map<Long, Set<String>> tableToBitmapDictColumns = Deencapsulation.getField(job, "tableToBitmapDictColumns");
Assert.assertTrue(tableToBitmapDictColumns.containsKey(tableId));
Assert.assertTrue(tableToBitmapDictColumns.get(tableId).contains("v2"));
// check remove v2 bitmap_dict func mapping from file group column mappings
Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2"));
}
}

View File

@ -32,6 +32,7 @@ import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DomainResolver;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.EditLog;
@ -1061,7 +1062,7 @@ public class AuthTest {
List<AccessPrivilege> usagePrivileges = Lists.newArrayList(AccessPrivilege.USAGE_PRIV);
UserDesc userDesc = new UserDesc(userIdentity, "12345", true);
// TODO(wyb): spark-load
GrantStmt.disableGrantResource = false;
Config.enable_spark_load = true;
// ------ grant|revoke resource to|from user ------
// 1. create user with no role