[Spark Load][Bug] Keep the column splitting in spark load consistent with broker load / mini load (#4532)
This commit is contained in:
@ -62,6 +62,7 @@ import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@ -553,12 +554,13 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
|
||||
parsers.add(ColumnParser.create(column));
|
||||
}
|
||||
char separator = (char)fileGroup.columnSeparator.getBytes(Charset.forName("UTF-8"))[0];
|
||||
// now we first support csv file
|
||||
// TODO: support parquet file and orc file
|
||||
JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
|
||||
record -> {
|
||||
scannedRowsAcc.add(1);
|
||||
String[] attributes = record.split(fileGroup.columnSeparator);
|
||||
String[] attributes = splitLine(record, separator);
|
||||
List<Row> result = new ArrayList<>();
|
||||
boolean validRow = true;
|
||||
if (attributes.length != columnSize) {
|
||||
@ -640,6 +642,25 @@ public final class SparkDpp implements java.io.Serializable {
|
||||
return srcSchema;
|
||||
}
|
||||
|
||||
// This method is to keep the splitting consistent with broker load / mini load
|
||||
private String[] splitLine(String line, char sep) {
|
||||
if (line == null || line.equals("")) {
|
||||
return new String[0];
|
||||
}
|
||||
int index = 0;
|
||||
int lastIndex = 0;
|
||||
// line-begin char and line-end char are considered to be 'delimeter'
|
||||
List<String> values = new ArrayList<>();
|
||||
for (int i = 0 ; i < line.length(); i++, index++) {
|
||||
if (line.charAt(index) == sep) {
|
||||
values.add(line.substring(lastIndex, index));
|
||||
lastIndex = index + 1;
|
||||
}
|
||||
}
|
||||
values.add(line.substring(lastIndex, index));
|
||||
return values.toArray(new String[0]);
|
||||
}
|
||||
|
||||
// partition keys will be parsed into double from json
|
||||
// so need to convert it to partition columns' type
|
||||
private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException {
|
||||
|
||||
@ -503,20 +503,13 @@ public class EtlJobConfig implements Serializable {
|
||||
this.filePaths = filePaths;
|
||||
this.fileFieldNames = fileFieldNames;
|
||||
this.columnsFromPath = columnsFromPath;
|
||||
this.columnSeparator = Strings.isNullOrEmpty(columnSeparator) ? "\t" : columnSeparator;
|
||||
this.lineDelimiter = lineDelimiter;
|
||||
this.isNegative = isNegative;
|
||||
this.fileFormat = fileFormat;
|
||||
this.columnMappings = columnMappings;
|
||||
this.where = where;
|
||||
this.partitions = partitions;
|
||||
|
||||
// Convert some special characters in column separator
|
||||
char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' : columnSeparator.charAt(0);
|
||||
if (".$|()[]{}^?*+\\".indexOf(sep) != -1) {
|
||||
this.columnSeparator = new String(new char[]{'\\', sep});
|
||||
} else {
|
||||
this.columnSeparator = Character.toString(sep);
|
||||
}
|
||||
}
|
||||
|
||||
// for data from table
|
||||
|
||||
Reference in New Issue
Block a user