diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 6e5a714f1b..fd71add5f9 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -553,12 +554,13 @@ public final class SparkDpp implements java.io.Serializable { for (EtlJobConfig.EtlColumn column : baseIndex.columns) { parsers.add(ColumnParser.create(column)); } + char separator = (char)fileGroup.columnSeparator.getBytes(Charset.forName("UTF-8"))[0]; // now we first support csv file // TODO: support parquet file and orc file JavaRDD rowRDD = sourceDataRdd.flatMap( record -> { scannedRowsAcc.add(1); - String[] attributes = record.split(fileGroup.columnSeparator); + String[] attributes = splitLine(record, separator); List result = new ArrayList<>(); boolean validRow = true; if (attributes.length != columnSize) { @@ -640,6 +642,25 @@ public final class SparkDpp implements java.io.Serializable { return srcSchema; } + // This method is to keep the splitting consistent with broker load / mini load + private String[] splitLine(String line, char sep) { + if (line == null || line.equals("")) { + return new String[0]; + } + int index = 0; + int lastIndex = 0; + // line-begin char and line-end char are considered to be 'delimeter' + List values = new ArrayList<>(); + for (int i = 0 ; i < line.length(); i++, index++) { + if (line.charAt(index) == sep) { + values.add(line.substring(lastIndex, index)); + lastIndex = index + 1; + } + } + values.add(line.substring(lastIndex, index)); + return values.toArray(new String[0]); + } + // partition keys will be parsed into double from json // so need to convert it to partition columns' type private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException { diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 9ee4d83688..8238aea7e5 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -503,20 +503,13 @@ public class EtlJobConfig implements Serializable { this.filePaths = filePaths; this.fileFieldNames = fileFieldNames; this.columnsFromPath = columnsFromPath; + this.columnSeparator = Strings.isNullOrEmpty(columnSeparator) ? "\t" : columnSeparator; this.lineDelimiter = lineDelimiter; this.isNegative = isNegative; this.fileFormat = fileFormat; this.columnMappings = columnMappings; this.where = where; this.partitions = partitions; - - // Convert some special characters in column separator - char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' : columnSeparator.charAt(0); - if (".$|()[]{}^?*+\\".indexOf(sep) != -1) { - this.columnSeparator = new String(new char[]{'\\', sep}); - } else { - this.columnSeparator = Character.toString(sep); - } } // for data from table