[Spark load][Fe 1/5] Add spark etl job config (#3712)

Add spark etl job config, includes:

1. Schema of the load tables, including columns, partitions and rollups
2. Infos of the source file, including split rules, corresponding columns, and conversion rules
3. ETL output directory and file name format
4. Job properties
5. Version for further extension
This commit is contained in:
wyb
2020-06-03 11:23:09 +08:00
committed by GitHub
parent 3194aa129d
commit ad7270b7ca

View File

@ -0,0 +1,571 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2.etl;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/** jobconfig.json file format
{
"tables": {
10014: {
"indexes": [{
"indexId": 10014,
"columns": [{
"columnName": "k1",
"columnType": "SMALLINT",
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "k2",
"columnType": "VARCHAR",
"stringLength": 20,
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "v",
"columnType": "BIGINT",
"isKey": false,
"isAllowNull": false,
"aggregationType": "NONE"
}],
"schemaHash": 1294206574,
"indexType": "DUPLICATE",
"isBaseIndex": true
}, {
"indexId": 10017,
"columns": [{
"columnName": "k1",
"columnType": "SMALLINT",
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "v",
"columnType": "BIGINT",
"isKey": false,
"isAllowNull": false,
"aggregationType": "BITMAP_UNION",
"defineExpr": "to_bitmap(v)"
}],
"schemaHash": 1294206575,
"indexType": "AGGREGATE",
"isBaseIndex": false
}],
"partitionInfo": {
"partitionType": "RANGE",
"partitionColumnRefs": ["k1"],
"distributionColumnRefs": ["k2"],
"partitions": [{
"partitionId": 10020,
"startKeys": [-100],
"endKeys": [10],
"isMaxPartition": false,
"bucketNum": 3
}, {
"partitionId": 10021,
"startKeys": [10],
"endKeys": [100],
"isMaxPartition": false,
"bucketNum": 3
}]
},
"fileGroups": [{
"partitions": [10020],
"filePaths": ["hdfs://hdfs_host:port/user/palo/test/file"],
"fileFieldNames": ["tmp_k1", "k2"],
"valueSeparator": ",",
"lineDelimiter": "\n",
"columnMappings": {
"k1": {
"functionName": "strftime",
"args": ["%Y-%m-%d %H:%M:%S", "tmp_k1"]
}
},
"where": "k2 > 10",
"isNegative": false,
"hiveTableName": "hive_db.table"
}]
}
},
"outputPath": "hdfs://hdfs_host:port/user/output/10003/label1/1582599203397",
"outputFilePattern": "V1.label1.%d.%d.%d.%d.%d.parquet",
"label": "label0",
"properties": {
"strictMode": false,
"timezone": "Asia/Shanghai"
},
"version": "V1"
}
*/
public class EtlJobConfig implements Serializable {
// global dict
public static final String GLOBAL_DICT_TABLE_NAME = "doris_global_dict_table_%d";
public static final String DISTINCT_KEY_TABLE_NAME = "doris_distinct_key_table_%d_%s";
public static final String DORIS_INTERMEDIATE_HIVE_TABLE_NAME = "doris_intermediate_hive_table_%d_%s";
// hdfsEtlPath/jobs/dbId/loadLabel/PendingTaskSignature
private static final String ETL_OUTPUT_PATH_FORMAT = "%s/jobs/%d/%s/%d";
private static final String ETL_OUTPUT_FILE_NAME_DESC_V1 = "version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet";
// tableId.partitionId.indexId.bucket.schemaHash
public static final String TABLET_META_FORMAT = "%d.%d.%d.%d.%d";
public static final String ETL_OUTPUT_FILE_FORMAT = "parquet";
// dpp result
public static final String DPP_RESULT_NAME = "dpp_result.json";
@SerializedName(value = "tables")
public Map<Long, EtlTable> tables;
@SerializedName(value = "outputPath")
public String outputPath;
@SerializedName(value = "outputFilePattern")
public String outputFilePattern;
@SerializedName(value = "label")
public String label;
@SerializedName(value = "properties")
public EtlJobProperty properties;
@SerializedName(value = "configVersion")
public ConfigVersion configVersion;
public EtlJobConfig(Map<Long, EtlTable> tables, String outputFilePattern, String label, EtlJobProperty properties) {
this.tables = tables;
// set outputPath when submit etl job
this.outputPath = null;
this.outputFilePattern = outputFilePattern;
this.label = label;
this.properties = properties;
this.configVersion = ConfigVersion.V1;
}
@Override
public String toString() {
return "EtlJobConfig{" +
"tables=" + tables +
", outputPath='" + outputPath + '\'' +
", outputFilePattern='" + outputFilePattern + '\'' +
", label='" + label + '\'' +
", properties=" + properties +
", version=" + configVersion +
'}';
}
public String getOutputPath() {
return outputPath;
}
public static String getOutputPath(String hdfsEtlPath, long dbId, String loadLabel, long taskSignature) {
return String.format(ETL_OUTPUT_PATH_FORMAT, hdfsEtlPath, dbId, loadLabel, taskSignature);
}
public static String getOutputFilePattern(String loadLabel, FilePatternVersion filePatternVersion) {
return String.format("%s.%s.%s.%s", filePatternVersion.name(), loadLabel, TABLET_META_FORMAT, ETL_OUTPUT_FILE_FORMAT);
}
public static String getDppResultFilePath(String outputPath) {
return outputPath + "/" + DPP_RESULT_NAME;
}
public static String getTabletMetaStr(String filePath) throws Exception {
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
String[] fileNameArr = fileName.split("\\.");
// check file version
switch (FilePatternVersion.valueOf(fileNameArr[0])) {
case V1:
// version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet
if (fileNameArr.length != ETL_OUTPUT_FILE_NAME_DESC_V1.split("\\.").length) {
throw new Exception("etl output file name error, format: " + ETL_OUTPUT_FILE_NAME_DESC_V1
+ ", name: " + fileName);
}
long tableId = Long.parseLong(fileNameArr[2]);
long partitionId = Long.parseLong(fileNameArr[3]);
long indexId = Long.parseLong(fileNameArr[4]);
int bucket = Integer.parseInt(fileNameArr[5]);
int schemaHash = Integer.parseInt(fileNameArr[6]);
// tableId.partitionId.indexId.bucket.schemaHash
return String.format(TABLET_META_FORMAT, tableId, partitionId, indexId, bucket, schemaHash);
default:
throw new Exception("etl output file version error. version: " + fileNameArr[0]);
}
}
public String configToJson() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy());
Gson gson = gsonBuilder.create();
return gson.toJson(this);
}
public static EtlJobConfig configFromJson(String jsonConfig) {
GsonBuilder gsonBuilder = new GsonBuilder();
Gson gson = gsonBuilder.create();
return gson.fromJson(jsonConfig, EtlJobConfig.class);
}
public static class EtlJobProperty implements Serializable {
@SerializedName(value = "strictMode")
public boolean strictMode;
@SerializedName(value = "timezone")
public String timezone;
@Override
public String toString() {
return "EtlJobProperty{" +
"strictMode=" + strictMode +
", timezone='" + timezone + '\'' +
'}';
}
}
public static enum ConfigVersion {
V1
}
public static enum FilePatternVersion {
V1
}
public static class EtlTable implements Serializable {
@SerializedName(value = "indexes")
public List<EtlIndex> indexes;
@SerializedName(value = "partitionInfo")
public EtlPartitionInfo partitionInfo;
@SerializedName(value = "fileGroups")
public List<EtlFileGroup> fileGroups;
public EtlTable(List<EtlIndex> etlIndexes, EtlPartitionInfo etlPartitionInfo) {
this.indexes = etlIndexes;
this.partitionInfo = etlPartitionInfo;
this.fileGroups = Lists.newArrayList();
}
public void addFileGroup(EtlFileGroup etlFileGroup) {
fileGroups.add(etlFileGroup);
}
@Override
public String toString() {
return "EtlTable{" +
"indexes=" + indexes +
", partitionInfo=" + partitionInfo +
", fileGroups=" + fileGroups +
'}';
}
}
public static class EtlColumn implements Serializable {
@SerializedName(value = "columnName")
public String columnName;
@SerializedName(value = "columnType")
public String columnType;
@SerializedName(value = "isAllowNull")
public boolean isAllowNull;
@SerializedName(value = "isKey")
public boolean isKey;
@SerializedName(value = "aggregationType")
public String aggregationType;
@SerializedName(value = "defaultValue")
public String defaultValue;
@SerializedName(value = "stringLength")
public int stringLength;
@SerializedName(value = "precision")
public int precision;
@SerializedName(value = "scale")
public int scale;
@SerializedName(value = "defineExpr")
public String defineExpr;
// for unit test
public EtlColumn() { }
public EtlColumn(String columnName, String columnType, boolean isAllowNull, boolean isKey,
String aggregationType, String defaultValue, int stringLength, int precision, int scale) {
this.columnName = columnName;
this.columnType = columnType;
this.isAllowNull = isAllowNull;
this.isKey = isKey;
this.aggregationType = aggregationType;
this.defaultValue = defaultValue;
this.stringLength = stringLength;
this.precision = precision;
this.scale = scale;
this.defineExpr = null;
}
@Override
public String toString() {
return "EtlColumn{" +
"columnName='" + columnName + '\'' +
", columnType='" + columnType + '\'' +
", isAllowNull=" + isAllowNull +
", isKey=" + isKey +
", aggregationType='" + aggregationType + '\'' +
", defaultValue='" + defaultValue + '\'' +
", stringLength=" + stringLength +
", precision=" + precision +
", scale=" + scale +
", defineExpr='" + defineExpr + '\'' +
'}';
}
}
public static class EtlIndexComparator implements Comparator<EtlIndex> {
@Override
public int compare(EtlIndex a, EtlIndex b) {
int diff = a.columns.size() - b.columns.size();
if (diff == 0) {
return 0;
} else if (diff > 0) {
return 1;
} else {
return -1;
}
}
}
public static class EtlIndex implements Serializable {
@SerializedName(value = "indexId")
public long indexId;
@SerializedName(value = "columns")
public List<EtlColumn> columns;
@SerializedName(value = "schemaHash")
public int schemaHash;
@SerializedName(value = "indexType")
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
public EtlIndex(long indexId, List<EtlColumn> etlColumns, int schemaHash,
String indexType, boolean isBaseIndex) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
}
public EtlColumn getColumn(String name) {
for (EtlColumn column : columns) {
if (column.columnName.equals(name)) {
return column;
}
}
return null;
}
@Override
public String toString() {
return "EtlIndex{" +
"indexId=" + indexId +
", columns=" + columns +
", schemaHash=" + schemaHash +
", indexType='" + indexType + '\'' +
", isBaseIndex=" + isBaseIndex +
'}';
}
}
public static class EtlPartitionInfo implements Serializable {
@SerializedName(value = "partitionType")
public String partitionType;
@SerializedName(value = "partitionColumnRefs")
public List<String> partitionColumnRefs;
@SerializedName(value = "distributionColumnRefs")
public List<String> distributionColumnRefs;
@SerializedName(value = "partitions")
public List<EtlPartition> partitions;
public EtlPartitionInfo(String partitionType, List<String> partitionColumnRefs,
List<String> distributionColumnRefs, List<EtlPartition> etlPartitions) {
this.partitionType = partitionType;
this.partitionColumnRefs = partitionColumnRefs;
this.distributionColumnRefs = distributionColumnRefs;
this.partitions = etlPartitions;
}
@Override
public String toString() {
return "EtlPartitionInfo{" +
"partitionType='" + partitionType + '\'' +
", partitionColumnRefs=" + partitionColumnRefs +
", distributionColumnRefs=" + distributionColumnRefs +
", partitions=" + partitions +
'}';
}
}
public static class EtlPartition implements Serializable {
@SerializedName(value = "partitionId")
public long partitionId;
@SerializedName(value = "startKeys")
public List<Object> startKeys;
@SerializedName(value = "endKeys")
public List<Object> endKeys;
@SerializedName(value = "isMaxPartition")
public boolean isMaxPartition;
@SerializedName(value = "bucketNum")
public int bucketNum;
public EtlPartition(long partitionId, List<Object> startKeys, List<Object> endKeys,
boolean isMaxPartition, int bucketNum) {
this.partitionId = partitionId;
this.startKeys = startKeys;
this.endKeys = endKeys;
this.isMaxPartition = isMaxPartition;
this.bucketNum = bucketNum;
}
@Override
public String toString() {
return "EtlPartition{" +
"partitionId=" + partitionId +
", startKeys=" + startKeys +
", endKeys=" + endKeys +
", isMaxPartition=" + isMaxPartition +
", bucketNum=" + bucketNum +
'}';
}
}
public static class EtlFileGroup implements Serializable {
@SerializedName(value = "filePaths")
public List<String> filePaths;
@SerializedName(value = "fileFieldNames")
public List<String> fileFieldNames;
@SerializedName(value = "columnsFromPath")
public List<String> columnsFromPath;
@SerializedName(value = "columnSeparator")
public String columnSeparator;
@SerializedName(value = "lineDelimiter")
public String lineDelimiter;
@SerializedName(value = "isNegative")
public boolean isNegative;
@SerializedName(value = "fileFormat")
public String fileFormat;
@SerializedName(value = "columnMappings")
public Map<String, EtlColumnMapping> columnMappings;
@SerializedName(value = "where")
public String where;
@SerializedName(value = "partitions")
public List<Long> partitions;
@SerializedName(value = "hiveTableName")
public String hiveTableName;
public EtlFileGroup(List<String> filePaths, List<String> fileFieldNames, List<String> columnsFromPath,
String columnSeparator, String lineDelimiter, boolean isNegative, String fileFormat,
Map<String, EtlColumnMapping> columnMappings, String where, List<Long> partitions) {
this.filePaths = filePaths;
this.fileFieldNames = fileFieldNames;
this.columnsFromPath = columnsFromPath;
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.isNegative = isNegative;
this.fileFormat = fileFormat;
this.columnMappings = columnMappings;
this.where = where;
this.partitions = partitions;
}
@Override
public String toString() {
return "EtlFileGroup{" +
"filePaths=" + filePaths +
", fileFieldNames=" + fileFieldNames +
", columnsFromPath=" + columnsFromPath +
", columnSeparator='" + columnSeparator + '\'' +
", lineDelimiter='" + lineDelimiter + '\'' +
", isNegative=" + isNegative +
", fileFormat='" + fileFormat + '\'' +
", columnMappings=" + columnMappings +
", where='" + where + '\'' +
", partitions=" + partitions +
", hiveTableName='" + hiveTableName + '\'' +
'}';
}
}
/**
* FunctionCallExpr = functionName(args)
* For compatibility with old designed functions used in Hadoop MapReduce etl
*
* expr is more general, like k1 + 1, not just FunctionCall
*/
public static class EtlColumnMapping implements Serializable {
@SerializedName(value = "functionName")
public String functionName;
@SerializedName(value = "args")
public List<String> args;
@SerializedName(value = "expr")
public String expr;
private static Map<String, String> functionMap = new ImmutableMap.Builder<String, String>()
.put("md5sum", "md5").build();
public EtlColumnMapping(String functionName, List<String> args) {
this.functionName = functionName;
this.args = args;
}
public EtlColumnMapping(String expr) {
this.expr = expr;
}
public String toDescription() {
StringBuilder sb = new StringBuilder();
if (functionName == null) {
sb.append(expr);
} else {
if (functionMap.containsKey(functionName)) {
sb.append(functionMap.get(functionName));
} else {
sb.append(functionName);
}
sb.append("(");
if (args != null) {
for (String arg : args) {
sb.append(arg);
sb.append(",");
}
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
}
return sb.toString();
}
@Override
public String toString() {
return "EtlColumnMapping{" +
"functionName='" + functionName + '\'' +
", args=" + args +
", expr=" + expr +
'}';
}
}
}