* (#5224)some little fix for spark load * 1 use yyyy-MM-dd instead of YYYY-MM-DD 2 unify lower case for bitmap column name
This commit is contained in:
@ -21,18 +21,22 @@ import org.apache.doris.common.SparkDppException;
|
||||
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.util.Date;
|
||||
|
||||
// Parser to validate value for different type
|
||||
public abstract class ColumnParser implements Serializable {
|
||||
|
||||
protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
|
||||
|
||||
// thread safe formatter
|
||||
public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd");
|
||||
public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException {
|
||||
String columnType = etlColumn.columnType;
|
||||
if (columnType.equalsIgnoreCase("TINYINT")) {
|
||||
@ -158,7 +162,7 @@ class DateParser extends ColumnParser {
|
||||
@Override
|
||||
public boolean parse(String value) {
|
||||
try {
|
||||
Date.parse(value);
|
||||
DATE_FORMATTER.parseDateTime(value);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return false;
|
||||
}
|
||||
@ -170,7 +174,7 @@ class DatetimeParser extends ColumnParser {
|
||||
@Override
|
||||
public boolean parse(String value) {
|
||||
try {
|
||||
DateTime.parse(value);
|
||||
DATE_TIME_FORMATTER.parseDateTime(value);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -108,13 +108,17 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
// we need to wrap it so that we can use it in executor.
|
||||
private SerializableConfiguration serializableHadoopConf;
|
||||
private DppResult dppResult = new DppResult();
|
||||
Map<Long, Set<String>> tableToBitmapDictColumns = new HashMap<>();
|
||||
|
||||
// just for ut
|
||||
public SparkDpp() {}
|
||||
|
||||
public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
|
||||
public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig, Map<Long, Set<String>> tableToBitmapDictColumns) {
|
||||
this.spark = spark;
|
||||
this.etlJobConfig = etlJobConfig;
|
||||
if (tableToBitmapDictColumns != null) {
|
||||
this.tableToBitmapDictColumns = tableToBitmapDictColumns;
|
||||
}
|
||||
}
|
||||
|
||||
public void init() {
|
||||
@ -543,7 +547,9 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
}
|
||||
}
|
||||
if (column.columnType.equalsIgnoreCase("DATE")) {
|
||||
dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
|
||||
dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.DateType));
|
||||
} else if (column.columnType.equalsIgnoreCase("DATETIME")) {
|
||||
dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.TimestampType));
|
||||
} else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
|
||||
dataframe = dataframe.withColumn(dstField.name(),
|
||||
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1")
|
||||
@ -844,7 +850,8 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
String hiveDbTableName,
|
||||
EtlJobConfig.EtlIndex baseIndex,
|
||||
EtlJobConfig.EtlFileGroup fileGroup,
|
||||
StructType dstTableSchema) throws SparkDppException {
|
||||
StructType dstTableSchema,
|
||||
Set<String> dictBitmapColumnSet) throws SparkDppException {
|
||||
// select base index columns from hive table
|
||||
StringBuilder sql = new StringBuilder();
|
||||
sql.append("select ");
|
||||
@ -857,18 +864,39 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
}
|
||||
|
||||
Dataset<Row> dataframe = spark.sql(sql.toString());
|
||||
// Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows
|
||||
// For stream load in doris BE, it runs as follow steps:
|
||||
// step 1: type check
|
||||
// step 2: expression calculation
|
||||
// step 3: strict mode check
|
||||
// step 4: nullable column check
|
||||
// BE can do the four steps row by row
|
||||
// but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on;
|
||||
// So in spark load, we first do step 1,3,4,and then do step 2.
|
||||
dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
|
||||
dstTableSchema);
|
||||
dstTableSchema, dictBitmapColumnSet);
|
||||
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
|
||||
return dataframe;
|
||||
}
|
||||
|
||||
private Dataset<Row> checkDataFromHiveWithStrictMode(
|
||||
Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException {
|
||||
Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema,
|
||||
Set<String> dictBitmapColumnSet) throws SparkDppException {
|
||||
List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
|
||||
List<ColumnParser> columnParserArrayList = new ArrayList<>();
|
||||
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
|
||||
if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
|
||||
// note(wb): there are three data source for bitmap column
|
||||
// case 1: global dict; need't check
|
||||
// case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
|
||||
// case 3: origin value is a integer value; it should be checked use LongParser
|
||||
if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
|
||||
if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) {
|
||||
continue;
|
||||
} else {
|
||||
columnNameNeedCheckArrayList.add(column);
|
||||
columnParserArrayList.add(new BigIntParser());
|
||||
}
|
||||
} else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
|
||||
!StringUtils.equalsIgnoreCase(column.columnType, "char") &&
|
||||
!mappingColKeys.contains(column.columnName)) {
|
||||
columnNameNeedCheckArrayList.add(column);
|
||||
@ -879,6 +907,7 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[columnParserArrayList.size()]);
|
||||
EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]);
|
||||
|
||||
StructType srcSchema = dataframe.schema();
|
||||
JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() {
|
||||
@Override
|
||||
public Iterator<Row> call(Row row) throws Exception {
|
||||
@ -898,6 +927,11 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
if (isStrictMode) {
|
||||
validRow = false;
|
||||
LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString()));
|
||||
// a column parsed failed would be filled null, but if doris column is not allowed null, we should skip this row
|
||||
} else if (!column.isAllowNull) {
|
||||
validRow = false;
|
||||
LOG.warn("column:" + i + " can not be null. row:" + row.toString());
|
||||
break;
|
||||
} else {
|
||||
columnIndexNeedToRepalceNull.add(fieldIndex);
|
||||
}
|
||||
@ -909,7 +943,7 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
if (abnormalRowAcc.value() <= 5) {
|
||||
invalidRows.add(row.toString());
|
||||
}
|
||||
} if (columnIndexNeedToRepalceNull.size() != 0) {
|
||||
} else if (columnIndexNeedToRepalceNull.size() != 0) {
|
||||
Object[] newRow = new Object[row.size()];
|
||||
for (int i = 0; i < row.size(); i++) {
|
||||
if (columnIndexNeedToRepalceNull.contains(i)) {
|
||||
@ -926,7 +960,8 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
}
|
||||
});
|
||||
|
||||
return spark.createDataFrame(result, dstTableSchema);
|
||||
// here we just check data but not do cast, so data type should be same with src schema which is hive table schema
|
||||
return spark.createDataFrame(result, srcSchema);
|
||||
}
|
||||
|
||||
private void process() throws Exception {
|
||||
@ -934,6 +969,7 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
|
||||
Long tableId = entry.getKey();
|
||||
EtlJobConfig.EtlTable etlTable = entry.getValue();
|
||||
Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>());
|
||||
|
||||
// get the base index meta
|
||||
EtlJobConfig.EtlIndex baseIndex = null;
|
||||
@ -982,7 +1018,8 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
if (sourceType == EtlJobConfig.SourceType.FILE) {
|
||||
fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
|
||||
} else if (sourceType == EtlJobConfig.SourceType.HIVE) {
|
||||
fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
|
||||
fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema,
|
||||
dictBitmapColumnSet);
|
||||
} else {
|
||||
throw new RuntimeException("Unknown source type: " + sourceType.name());
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.load.loadv2.etl;
|
||||
|
||||
import org.apache.doris.common.SparkDppException;
|
||||
import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
|
||||
import org.apache.doris.load.loadv2.dpp.SparkDpp;
|
||||
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
|
||||
@ -53,6 +54,7 @@ public class SparkEtlJob {
|
||||
|
||||
private static final String BITMAP_DICT_FUNC = "bitmap_dict";
|
||||
private static final String TO_BITMAP_FUNC = "to_bitmap";
|
||||
private static final String BITMAP_HASH = "bitmap_hash";
|
||||
|
||||
private String jobConfigFilePath;
|
||||
private EtlJobConfig etlJobConfig;
|
||||
@ -112,8 +114,11 @@ public class SparkEtlJob {
|
||||
String columnName = mappingEntry.getKey();
|
||||
String exprStr = mappingEntry.getValue().toDescription();
|
||||
String funcName = functions.expr(exprStr).expr().prettyName();
|
||||
if (funcName.equalsIgnoreCase(BITMAP_HASH)) {
|
||||
throw new SparkDppException("spark load not support bitmap_hash now");
|
||||
}
|
||||
if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
|
||||
bitmapDictColumns.add(columnName);
|
||||
bitmapDictColumns.add(columnName.toLowerCase());
|
||||
} else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
|
||||
newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue());
|
||||
}
|
||||
@ -137,7 +142,7 @@ public class SparkEtlJob {
|
||||
}
|
||||
|
||||
private void processDpp() throws Exception {
|
||||
SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
|
||||
SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig, tableToBitmapDictColumns);
|
||||
sparkDpp.init();
|
||||
sparkDpp.doDpp();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user