diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 949d966109..77c905e9ba 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -198,7 +198,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT, - KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIRST, KW_FLOAT, KW_FOR, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, + KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP, KW_HASH, KW_HAVING, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HUB, KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, @@ -403,7 +403,7 @@ nonterminal List alter_table_clause_list; // nonterminal String keyword, ident, ident_or_text, variable_name, text_or_password, charset_name_or_default, old_or_new_charset_name_or_default, opt_collate, - collation_name_or_default, type_func_name_keyword, type_function_name; + collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format; nonterminal String opt_db, opt_partition_name, procedure_or_function, opt_default_value, opt_comment, opt_engine; nonterminal Boolean opt_if_exists, opt_if_not_exists; @@ -1044,10 +1044,11 @@ data_desc ::= KW_INTO KW_TABLE ident:tableName opt_partitions:partitionNames opt_field_term:colSep + opt_file_format:fileFormat opt_col_list:colList opt_col_mapping_list:colMappingList {: - RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, isNeg, colMappingList); + RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, isNeg, colMappingList); :} ; @@ -1097,6 +1098,13 @@ column_separator ::= :} ; +opt_file_format ::= + /* Empty */ + {: RESULT = null; :} + | KW_FORMAT KW_AS ident_or_text:format + {: RESULT = format; :} + ; + opt_col_list ::= {: RESULT = null; @@ -3897,6 +3905,8 @@ keyword ::= {: RESULT = id; :} | KW_FIRST:id {: RESULT = id; :} + | KW_FORMAT:id + {: RESULT = id; :} | KW_FUNCTION:id {: RESULT = id; :} | KW_END:id diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 98e822bf65..683eb3449a 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -50,6 +50,7 @@ import java.util.Set; // INTO TABLE tbl_name // [PARTITION (p1, p2)] // [COLUMNS TERMINATED BY separator] +// [FORMAT AS format] // [(col1, ...)] // [SET (k1=f1(xx), k2=f2(xx))] public class DataDescription { @@ -57,9 +58,10 @@ public class DataDescription { public static String FUNCTION_HASH_HLL = "hll_hash"; private final String tableName; private final List partitionNames; - private final List filePathes; + private final List filePaths; private final List columnNames; private final ColumnSeparator columnSeparator; + private final String fileFormat; private final boolean isNegative; private final List columnMappingList; @@ -74,16 +76,35 @@ public class DataDescription { public DataDescription(String tableName, List partitionNames, - List filePathes, + List filePaths, List columnNames, ColumnSeparator columnSeparator, boolean isNegative, List columnMappingList) { this.tableName = tableName; this.partitionNames = partitionNames; - this.filePathes = filePathes; + this.filePaths = filePaths; this.columnNames = columnNames; this.columnSeparator = columnSeparator; + this.fileFormat = null; + this.isNegative = isNegative; + this.columnMappingList = columnMappingList; + } + + public DataDescription(String tableName, + List partitionNames, + List filePaths, + List columnNames, + ColumnSeparator columnSeparator, + String fileFormat, + boolean isNegative, + List columnMappingList) { + this.tableName = tableName; + this.partitionNames = partitionNames; + this.filePaths = filePaths; + this.columnNames = columnNames; + this.columnSeparator = columnSeparator; + this.fileFormat = fileFormat; this.isNegative = isNegative; this.columnMappingList = columnMappingList; } @@ -96,14 +117,16 @@ public class DataDescription { return partitionNames; } - public List getFilePathes() { - return filePathes; + public List getFilePaths() { + return filePaths; } public List getColumnNames() { return columnNames; } + public String getFileFormat() { return fileFormat; } + public String getColumnSeparator() { if (columnSeparator == null) { return null; @@ -204,10 +227,15 @@ public class DataDescription { if (columnToFunction.containsKey(column)) { throw new AnalysisException("Duplicate column mapping: " + column); } - + + // we support function and column reference to change a column name Expr child1 = predicate.getChild(1); if (!(child1 instanceof FunctionCallExpr)) { - throw new AnalysisException("Mapping function error, function: " + child1.toSql()); + if (isPullLoad && child1 instanceof SlotRef) { + // we only support SlotRef in pull load + } else { + throw new AnalysisException("Mapping function error, function: " + child1.toSql()); + } } if (!child1.supportSerializable()) { @@ -216,6 +244,12 @@ public class DataDescription { parsedExprMap.put(column, child1); + if (!(child1 instanceof FunctionCallExpr)) { + // only just for pass later check + columnToFunction.put(column, Pair.create("__slot_ref", Lists.newArrayList())); + continue; + } + FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1; String functionName = functionCallExpr.getFnName().getFunction(); List paramExprs = functionCallExpr.getParams().exprs(); @@ -413,11 +447,11 @@ public class DataDescription { ConnectContext.get().getRemoteIP(), tableName); } - if (filePathes == null || filePathes.isEmpty()) { + if (filePaths == null || filePaths.isEmpty()) { throw new AnalysisException("No file path in load statement."); } - for (int i = 0; i < filePathes.size(); ++i) { - filePathes.set(i, filePathes.get(i).trim()); + for (int i = 0; i < filePaths.size(); ++i) { + filePaths.set(i, filePaths.get(i).trim()); } if (columnSeparator != null) { @@ -431,7 +465,7 @@ public class DataDescription { public String toSql() { StringBuilder sb = new StringBuilder(); sb.append("DATA INFILE ("); - Joiner.on(", ").appendTo(sb, Lists.transform(filePathes, new Function() { + Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function() { @Override public String apply(String s) { return "'" + s + "'"; diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index e9a2fcd3fb..7f42616161 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -17,7 +17,6 @@ package org.apache.doris.load; -import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; @@ -57,15 +56,17 @@ public class BrokerFileGroup implements Writable { private long tableId; private String valueSeparator; private String lineDelimiter; + // fileFormat may be null, which means format will be decided by file's suffix + // TODO(zc): we need to persist fileFormat, this should be done in next META_VERSION increase + private String fileFormat; private boolean isNegative; private List partitionIds; private List fileFieldNames; - private List filePathes; + private List filePaths; // This column need expression to get column private Map exprColumnMap; - // Used for recovery from edit log private BrokerFileGroup() { } @@ -76,7 +77,7 @@ public class BrokerFileGroup implements Writable { this.valueSeparator = ColumnSeparator.convertSeparator(table.getColumnSeparator()); this.lineDelimiter = table.getLineDelimiter(); this.isNegative = false; - this.filePathes = table.getPaths(); + this.filePaths = table.getPaths(); } public BrokerFileGroup(DataDescription dataDescription) { @@ -129,10 +130,13 @@ public class BrokerFileGroup implements Writable { if (lineDelimiter == null) { lineDelimiter = "\n"; } + + fileFormat = dataDescription.getFileFormat(); + isNegative = dataDescription.isNegative(); // FilePath - filePathes = dataDescription.getFilePathes(); + filePaths = dataDescription.getFilePaths(); } public long getTableId() { @@ -146,6 +150,7 @@ public class BrokerFileGroup implements Writable { public String getLineDelimiter() { return lineDelimiter; } + public String getFileFormat() { return fileFormat; } public boolean isNegative() { return isNegative; @@ -163,8 +168,8 @@ public class BrokerFileGroup implements Writable { return dataDescription.getPartitionNames(); } - public List getFilePathes() { - return filePathes; + public List getFilePaths() { + return filePaths; } public Map getExprColumnMap() { @@ -202,7 +207,7 @@ public class BrokerFileGroup implements Writable { .append(",isNegative=").append(isNegative); sb.append(",fileInfos=["); int idx = 0; - for (String path : filePathes) { + for (String path : filePaths) { if (idx++ != 0) { sb.append(","); } @@ -243,9 +248,9 @@ public class BrokerFileGroup implements Writable { Text.writeString(out, name); } } - // filePathes - out.writeInt(filePathes.size()); - for (String path : filePathes) { + // filePaths + out.writeInt(filePaths.size()); + for (String path : filePaths) { Text.writeString(out, path); } // expr column map @@ -290,9 +295,9 @@ public class BrokerFileGroup implements Writable { // fileInfos { int size = in.readInt(); - filePathes = Lists.newArrayList(); + filePaths = Lists.newArrayList(); for (int i = 0; i < size; ++i) { - filePathes.add(Text.readString(in)); + filePaths.add(Text.readString(in)); } } // expr column map diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 405bfe3c1b..982c2705d8 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -62,7 +62,6 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.util.ListComparator; -import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.AsyncDeleteJob.DeleteState; @@ -651,7 +650,7 @@ public class Load { Map>> tableToPartitionSources, boolean deleteFlag) throws DdlException { - Source source = new Source(dataDescription.getFilePathes()); + Source source = new Source(dataDescription.getFilePaths()); long tableId = -1; Set sourcePartitionIds = Sets.newHashSet(); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 3f2a6bd5b5..f95cd9567c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -68,7 +68,7 @@ public class BrokerLoadPendingTask extends LoadTask { List fileGroups = entry.getValue(); for (BrokerFileGroup fileGroup : fileGroups) { List fileStatuses = Lists.newArrayList(); - for (String path : fileGroup.getFilePathes()) { + for (String path : fileGroup.getFilePaths()) { BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); } fileStatusList.add(fileStatuses); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index fbc6327d3d..dec1aad63f 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -323,7 +323,9 @@ public class BrokerScanNode extends ScanNode { params.setProperties(brokerDesc.getProperties()); - context.exprMap = fileGroup.getExprColumnMap(); + // We must create a new map here, because we will change this map later. + // But fileGroup will be persisted later, so we keep it unchanged. + context.exprMap = Maps.newHashMap(fileGroup.getExprColumnMap()); parseExprMap(context.exprMap); // Generate expr @@ -488,7 +490,7 @@ public class BrokerScanNode extends ScanNode { filesAdded = 0; for (BrokerFileGroup fileGroup : fileGroups) { List fileStatuses = Lists.newArrayList(); - for (String path : fileGroup.getFilePathes()) { + for (String path : fileGroup.getFilePaths()) { BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); } fileStatusesList.add(fileStatuses); @@ -553,7 +555,9 @@ public class BrokerScanNode extends ScanNode { } } + // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( + String fileFormat, TBrokerScanRangeParams params, List fileStatuses) throws UserException { @@ -640,7 +644,7 @@ public class BrokerScanNode extends ScanNode { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - processFileGroup(context.params, fileStatuses); + processFileGroup(context.fileGroup.getFileFormat(), context.params, fileStatuses); } if (LOG.isDebugEnabled()) { for (TScanRangeLocations locations : locationsList) { diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java index be0aba0bc8..f820169d46 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadPendingTask.java @@ -114,7 +114,7 @@ public class PullLoadPendingTask extends LoadPendingTask { List fileGroups = entry.getValue(); for (BrokerFileGroup fileGroup : fileGroups) { List fileStatuses = Lists.newArrayList(); - for (String path : fileGroup.getFilePathes()) { + for (String path : fileGroup.getFilePaths()) { BrokerUtil.parseBrokerFile(path, job.getBrokerDesc(), fileStatuses); } fileStatusList.add(fileStatuses); diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index f82e39c21f..7e217cbb15 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -153,6 +153,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("follower", new Integer(SqlParserSymbols.KW_FOLLOWER)); keywordMap.put("following", new Integer(SqlParserSymbols.KW_FOLLOWING)); keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR)); + keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT)); keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM)); keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND)); keywordMap.put("frontends", new Integer(SqlParserSymbols.KW_FRONTENDS)); diff --git a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 1b518856fd..950b8d3cc0 100644 --- a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -72,7 +72,7 @@ public class DataDescriptionTest { Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString()); Assert.assertEquals("testTable", desc.getTableName()); Assert.assertEquals("[col1, col2]", desc.getColumnNames().toString()); - Assert.assertEquals("[abc.txt]", desc.getFilePathes().toString()); + Assert.assertEquals("[abc.txt]", desc.getFilePaths().toString()); Assert.assertTrue(desc.isNegative()); Assert.assertNull(desc.getColumnSeparator());