From ad7270b7ca7c021a85b419a4b2007dcf0407fdcf Mon Sep 17 00:00:00 2001 From: wyb Date: Wed, 3 Jun 2020 11:23:09 +0800 Subject: [PATCH] [Spark load][Fe 1/5] Add spark etl job config (#3712) Add spark etl job config, includes: 1. Schema of the load tables, including columns, partitions and rollups 2. Infos of the source file, including split rules, corresponding columns, and conversion rules 3. ETL output directory and file name format 4. Job properties 5. Version for further extension --- .../doris/load/loadv2/etl/EtlJobConfig.java | 571 ++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java new file mode 100644 index 0000000000..341633c1ba --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2.etl; + +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** jobconfig.json file format +{ + "tables": { + 10014: { + "indexes": [{ + "indexId": 10014, + "columns": [{ + "columnName": "k1", + "columnType": "SMALLINT", + "isKey": true, + "isAllowNull": true, + "aggregationType": "NONE" + }, { + "columnName": "k2", + "columnType": "VARCHAR", + "stringLength": 20, + "isKey": true, + "isAllowNull": true, + "aggregationType": "NONE" + }, { + "columnName": "v", + "columnType": "BIGINT", + "isKey": false, + "isAllowNull": false, + "aggregationType": "NONE" + }], + "schemaHash": 1294206574, + "indexType": "DUPLICATE", + "isBaseIndex": true + }, { + "indexId": 10017, + "columns": [{ + "columnName": "k1", + "columnType": "SMALLINT", + "isKey": true, + "isAllowNull": true, + "aggregationType": "NONE" + }, { + "columnName": "v", + "columnType": "BIGINT", + "isKey": false, + "isAllowNull": false, + "aggregationType": "BITMAP_UNION", + "defineExpr": "to_bitmap(v)" + }], + "schemaHash": 1294206575, + "indexType": "AGGREGATE", + "isBaseIndex": false + }], + "partitionInfo": { + "partitionType": "RANGE", + "partitionColumnRefs": ["k1"], + "distributionColumnRefs": ["k2"], + "partitions": [{ + "partitionId": 10020, + "startKeys": [-100], + "endKeys": [10], + "isMaxPartition": false, + "bucketNum": 3 + }, { + "partitionId": 10021, + "startKeys": [10], + "endKeys": [100], + "isMaxPartition": false, + "bucketNum": 3 + }] + }, + "fileGroups": [{ + "partitions": [10020], + "filePaths": ["hdfs://hdfs_host:port/user/palo/test/file"], + "fileFieldNames": ["tmp_k1", "k2"], + "valueSeparator": ",", + "lineDelimiter": "\n", + "columnMappings": { + "k1": { + "functionName": "strftime", + "args": ["%Y-%m-%d %H:%M:%S", "tmp_k1"] + } + }, + "where": "k2 > 10", + "isNegative": false, + "hiveTableName": "hive_db.table" + }] + } + }, + "outputPath": "hdfs://hdfs_host:port/user/output/10003/label1/1582599203397", + "outputFilePattern": "V1.label1.%d.%d.%d.%d.%d.parquet", + "label": "label0", + "properties": { + "strictMode": false, + "timezone": "Asia/Shanghai" + }, + "version": "V1" +} + */ +public class EtlJobConfig implements Serializable { + // global dict + public static final String GLOBAL_DICT_TABLE_NAME = "doris_global_dict_table_%d"; + public static final String DISTINCT_KEY_TABLE_NAME = "doris_distinct_key_table_%d_%s"; + public static final String DORIS_INTERMEDIATE_HIVE_TABLE_NAME = "doris_intermediate_hive_table_%d_%s"; + + // hdfsEtlPath/jobs/dbId/loadLabel/PendingTaskSignature + private static final String ETL_OUTPUT_PATH_FORMAT = "%s/jobs/%d/%s/%d"; + private static final String ETL_OUTPUT_FILE_NAME_DESC_V1 = "version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet"; + // tableId.partitionId.indexId.bucket.schemaHash + public static final String TABLET_META_FORMAT = "%d.%d.%d.%d.%d"; + public static final String ETL_OUTPUT_FILE_FORMAT = "parquet"; + + // dpp result + public static final String DPP_RESULT_NAME = "dpp_result.json"; + + @SerializedName(value = "tables") + public Map tables; + @SerializedName(value = "outputPath") + public String outputPath; + @SerializedName(value = "outputFilePattern") + public String outputFilePattern; + @SerializedName(value = "label") + public String label; + @SerializedName(value = "properties") + public EtlJobProperty properties; + @SerializedName(value = "configVersion") + public ConfigVersion configVersion; + + public EtlJobConfig(Map tables, String outputFilePattern, String label, EtlJobProperty properties) { + this.tables = tables; + // set outputPath when submit etl job + this.outputPath = null; + this.outputFilePattern = outputFilePattern; + this.label = label; + this.properties = properties; + this.configVersion = ConfigVersion.V1; + } + + @Override + public String toString() { + return "EtlJobConfig{" + + "tables=" + tables + + ", outputPath='" + outputPath + '\'' + + ", outputFilePattern='" + outputFilePattern + '\'' + + ", label='" + label + '\'' + + ", properties=" + properties + + ", version=" + configVersion + + '}'; + } + + public String getOutputPath() { + return outputPath; + } + + public static String getOutputPath(String hdfsEtlPath, long dbId, String loadLabel, long taskSignature) { + return String.format(ETL_OUTPUT_PATH_FORMAT, hdfsEtlPath, dbId, loadLabel, taskSignature); + } + + public static String getOutputFilePattern(String loadLabel, FilePatternVersion filePatternVersion) { + return String.format("%s.%s.%s.%s", filePatternVersion.name(), loadLabel, TABLET_META_FORMAT, ETL_OUTPUT_FILE_FORMAT); + } + + public static String getDppResultFilePath(String outputPath) { + return outputPath + "/" + DPP_RESULT_NAME; + } + + public static String getTabletMetaStr(String filePath) throws Exception { + String fileName = filePath.substring(filePath.lastIndexOf("/") + 1); + String[] fileNameArr = fileName.split("\\."); + // check file version + switch (FilePatternVersion.valueOf(fileNameArr[0])) { + case V1: + // version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet + if (fileNameArr.length != ETL_OUTPUT_FILE_NAME_DESC_V1.split("\\.").length) { + throw new Exception("etl output file name error, format: " + ETL_OUTPUT_FILE_NAME_DESC_V1 + + ", name: " + fileName); + } + long tableId = Long.parseLong(fileNameArr[2]); + long partitionId = Long.parseLong(fileNameArr[3]); + long indexId = Long.parseLong(fileNameArr[4]); + int bucket = Integer.parseInt(fileNameArr[5]); + int schemaHash = Integer.parseInt(fileNameArr[6]); + // tableId.partitionId.indexId.bucket.schemaHash + return String.format(TABLET_META_FORMAT, tableId, partitionId, indexId, bucket, schemaHash); + default: + throw new Exception("etl output file version error. version: " + fileNameArr[0]); + } + } + + public String configToJson() { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy()); + Gson gson = gsonBuilder.create(); + return gson.toJson(this); + } + + public static EtlJobConfig configFromJson(String jsonConfig) { + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + return gson.fromJson(jsonConfig, EtlJobConfig.class); + } + + public static class EtlJobProperty implements Serializable { + @SerializedName(value = "strictMode") + public boolean strictMode; + @SerializedName(value = "timezone") + public String timezone; + + @Override + public String toString() { + return "EtlJobProperty{" + + "strictMode=" + strictMode + + ", timezone='" + timezone + '\'' + + '}'; + } + } + + public static enum ConfigVersion { + V1 + } + + public static enum FilePatternVersion { + V1 + } + + public static class EtlTable implements Serializable { + @SerializedName(value = "indexes") + public List indexes; + @SerializedName(value = "partitionInfo") + public EtlPartitionInfo partitionInfo; + @SerializedName(value = "fileGroups") + public List fileGroups; + + public EtlTable(List etlIndexes, EtlPartitionInfo etlPartitionInfo) { + this.indexes = etlIndexes; + this.partitionInfo = etlPartitionInfo; + this.fileGroups = Lists.newArrayList(); + } + + public void addFileGroup(EtlFileGroup etlFileGroup) { + fileGroups.add(etlFileGroup); + } + + @Override + public String toString() { + return "EtlTable{" + + "indexes=" + indexes + + ", partitionInfo=" + partitionInfo + + ", fileGroups=" + fileGroups + + '}'; + } + } + + public static class EtlColumn implements Serializable { + @SerializedName(value = "columnName") + public String columnName; + @SerializedName(value = "columnType") + public String columnType; + @SerializedName(value = "isAllowNull") + public boolean isAllowNull; + @SerializedName(value = "isKey") + public boolean isKey; + @SerializedName(value = "aggregationType") + public String aggregationType; + @SerializedName(value = "defaultValue") + public String defaultValue; + @SerializedName(value = "stringLength") + public int stringLength; + @SerializedName(value = "precision") + public int precision; + @SerializedName(value = "scale") + public int scale; + @SerializedName(value = "defineExpr") + public String defineExpr; + + // for unit test + public EtlColumn() { } + + public EtlColumn(String columnName, String columnType, boolean isAllowNull, boolean isKey, + String aggregationType, String defaultValue, int stringLength, int precision, int scale) { + this.columnName = columnName; + this.columnType = columnType; + this.isAllowNull = isAllowNull; + this.isKey = isKey; + this.aggregationType = aggregationType; + this.defaultValue = defaultValue; + this.stringLength = stringLength; + this.precision = precision; + this.scale = scale; + this.defineExpr = null; + } + + @Override + public String toString() { + return "EtlColumn{" + + "columnName='" + columnName + '\'' + + ", columnType='" + columnType + '\'' + + ", isAllowNull=" + isAllowNull + + ", isKey=" + isKey + + ", aggregationType='" + aggregationType + '\'' + + ", defaultValue='" + defaultValue + '\'' + + ", stringLength=" + stringLength + + ", precision=" + precision + + ", scale=" + scale + + ", defineExpr='" + defineExpr + '\'' + + '}'; + } + } + + public static class EtlIndexComparator implements Comparator { + @Override + public int compare(EtlIndex a, EtlIndex b) { + int diff = a.columns.size() - b.columns.size(); + if (diff == 0) { + return 0; + } else if (diff > 0) { + return 1; + } else { + return -1; + } + } + } + + public static class EtlIndex implements Serializable { + @SerializedName(value = "indexId") + public long indexId; + @SerializedName(value = "columns") + public List columns; + @SerializedName(value = "schemaHash") + public int schemaHash; + @SerializedName(value = "indexType") + public String indexType; + @SerializedName(value = "isBaseIndex") + public boolean isBaseIndex; + + public EtlIndex(long indexId, List etlColumns, int schemaHash, + String indexType, boolean isBaseIndex) { + this.indexId = indexId; + this.columns = etlColumns; + this.schemaHash = schemaHash; + this.indexType = indexType; + this.isBaseIndex = isBaseIndex; + } + + public EtlColumn getColumn(String name) { + for (EtlColumn column : columns) { + if (column.columnName.equals(name)) { + return column; + } + } + return null; + } + + @Override + public String toString() { + return "EtlIndex{" + + "indexId=" + indexId + + ", columns=" + columns + + ", schemaHash=" + schemaHash + + ", indexType='" + indexType + '\'' + + ", isBaseIndex=" + isBaseIndex + + '}'; + } + } + + public static class EtlPartitionInfo implements Serializable { + @SerializedName(value = "partitionType") + public String partitionType; + @SerializedName(value = "partitionColumnRefs") + public List partitionColumnRefs; + @SerializedName(value = "distributionColumnRefs") + public List distributionColumnRefs; + @SerializedName(value = "partitions") + public List partitions; + + public EtlPartitionInfo(String partitionType, List partitionColumnRefs, + List distributionColumnRefs, List etlPartitions) { + this.partitionType = partitionType; + this.partitionColumnRefs = partitionColumnRefs; + this.distributionColumnRefs = distributionColumnRefs; + this.partitions = etlPartitions; + } + + @Override + public String toString() { + return "EtlPartitionInfo{" + + "partitionType='" + partitionType + '\'' + + ", partitionColumnRefs=" + partitionColumnRefs + + ", distributionColumnRefs=" + distributionColumnRefs + + ", partitions=" + partitions + + '}'; + } + } + + public static class EtlPartition implements Serializable { + @SerializedName(value = "partitionId") + public long partitionId; + @SerializedName(value = "startKeys") + public List startKeys; + @SerializedName(value = "endKeys") + public List endKeys; + @SerializedName(value = "isMaxPartition") + public boolean isMaxPartition; + @SerializedName(value = "bucketNum") + public int bucketNum; + + public EtlPartition(long partitionId, List startKeys, List endKeys, + boolean isMaxPartition, int bucketNum) { + this.partitionId = partitionId; + this.startKeys = startKeys; + this.endKeys = endKeys; + this.isMaxPartition = isMaxPartition; + this.bucketNum = bucketNum; + } + + @Override + public String toString() { + return "EtlPartition{" + + "partitionId=" + partitionId + + ", startKeys=" + startKeys + + ", endKeys=" + endKeys + + ", isMaxPartition=" + isMaxPartition + + ", bucketNum=" + bucketNum + + '}'; + } + } + + public static class EtlFileGroup implements Serializable { + @SerializedName(value = "filePaths") + public List filePaths; + @SerializedName(value = "fileFieldNames") + public List fileFieldNames; + @SerializedName(value = "columnsFromPath") + public List columnsFromPath; + @SerializedName(value = "columnSeparator") + public String columnSeparator; + @SerializedName(value = "lineDelimiter") + public String lineDelimiter; + @SerializedName(value = "isNegative") + public boolean isNegative; + @SerializedName(value = "fileFormat") + public String fileFormat; + @SerializedName(value = "columnMappings") + public Map columnMappings; + @SerializedName(value = "where") + public String where; + @SerializedName(value = "partitions") + public List partitions; + @SerializedName(value = "hiveTableName") + public String hiveTableName; + + public EtlFileGroup(List filePaths, List fileFieldNames, List columnsFromPath, + String columnSeparator, String lineDelimiter, boolean isNegative, String fileFormat, + Map columnMappings, String where, List partitions) { + this.filePaths = filePaths; + this.fileFieldNames = fileFieldNames; + this.columnsFromPath = columnsFromPath; + this.columnSeparator = columnSeparator; + this.lineDelimiter = lineDelimiter; + this.isNegative = isNegative; + this.fileFormat = fileFormat; + this.columnMappings = columnMappings; + this.where = where; + this.partitions = partitions; + } + + @Override + public String toString() { + return "EtlFileGroup{" + + "filePaths=" + filePaths + + ", fileFieldNames=" + fileFieldNames + + ", columnsFromPath=" + columnsFromPath + + ", columnSeparator='" + columnSeparator + '\'' + + ", lineDelimiter='" + lineDelimiter + '\'' + + ", isNegative=" + isNegative + + ", fileFormat='" + fileFormat + '\'' + + ", columnMappings=" + columnMappings + + ", where='" + where + '\'' + + ", partitions=" + partitions + + ", hiveTableName='" + hiveTableName + '\'' + + '}'; + } + } + + /** + * FunctionCallExpr = functionName(args) + * For compatibility with old designed functions used in Hadoop MapReduce etl + * + * expr is more general, like k1 + 1, not just FunctionCall + */ + public static class EtlColumnMapping implements Serializable { + @SerializedName(value = "functionName") + public String functionName; + @SerializedName(value = "args") + public List args; + @SerializedName(value = "expr") + public String expr; + + private static Map functionMap = new ImmutableMap.Builder() + .put("md5sum", "md5").build(); + + public EtlColumnMapping(String functionName, List args) { + this.functionName = functionName; + this.args = args; + } + + public EtlColumnMapping(String expr) { + this.expr = expr; + } + + public String toDescription() { + StringBuilder sb = new StringBuilder(); + if (functionName == null) { + sb.append(expr); + } else { + if (functionMap.containsKey(functionName)) { + sb.append(functionMap.get(functionName)); + } else { + sb.append(functionName); + } + sb.append("("); + if (args != null) { + for (String arg : args) { + sb.append(arg); + sb.append(","); + } + } + sb.deleteCharAt(sb.length() - 1); + sb.append(")"); + } + return sb.toString(); + } + + @Override + public String toString() { + return "EtlColumnMapping{" + + "functionName='" + functionName + '\'' + + ", args=" + args + + ", expr=" + expr + + '}'; + } + } +}