Add support column reference in LOAD statement (#1162)

This commit is contained in:
ZHAO Chun
2019-05-15 20:26:10 +08:00
committed by GitHub
parent b24fab48cd
commit f985ea99fc
9 changed files with 88 additions and 35 deletions

View File

@ -198,7 +198,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
KW_ELSE, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT,
KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIRST, KW_FLOAT, KW_FOR, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION,
KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION,
KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP,
KW_HASH, KW_HAVING, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HUB,
KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE,
@ -403,7 +403,7 @@ nonterminal List<AlterClause> alter_table_clause_list;
//
nonterminal String keyword, ident, ident_or_text, variable_name, text_or_password,
charset_name_or_default, old_or_new_charset_name_or_default, opt_collate,
collation_name_or_default, type_func_name_keyword, type_function_name;
collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format;
nonterminal String opt_db, opt_partition_name, procedure_or_function, opt_default_value, opt_comment, opt_engine;
nonterminal Boolean opt_if_exists, opt_if_not_exists;
@ -1044,10 +1044,11 @@ data_desc ::=
KW_INTO KW_TABLE ident:tableName
opt_partitions:partitionNames
opt_field_term:colSep
opt_file_format:fileFormat
opt_col_list:colList
opt_col_mapping_list:colMappingList
{:
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, isNeg, colMappingList);
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, isNeg, colMappingList);
:}
;
@ -1097,6 +1098,13 @@ column_separator ::=
:}
;
opt_file_format ::=
/* Empty */
{: RESULT = null; :}
| KW_FORMAT KW_AS ident_or_text:format
{: RESULT = format; :}
;
opt_col_list ::=
{:
RESULT = null;
@ -3897,6 +3905,8 @@ keyword ::=
{: RESULT = id; :}
| KW_FIRST:id
{: RESULT = id; :}
| KW_FORMAT:id
{: RESULT = id; :}
| KW_FUNCTION:id
{: RESULT = id; :}
| KW_END:id

View File

@ -50,6 +50,7 @@ import java.util.Set;
// INTO TABLE tbl_name
// [PARTITION (p1, p2)]
// [COLUMNS TERMINATED BY separator]
// [FORMAT AS format]
// [(col1, ...)]
// [SET (k1=f1(xx), k2=f2(xx))]
public class DataDescription {
@ -57,9 +58,10 @@ public class DataDescription {
public static String FUNCTION_HASH_HLL = "hll_hash";
private final String tableName;
private final List<String> partitionNames;
private final List<String> filePathes;
private final List<String> filePaths;
private final List<String> columnNames;
private final ColumnSeparator columnSeparator;
private final String fileFormat;
private final boolean isNegative;
private final List<Expr> columnMappingList;
@ -74,16 +76,35 @@ public class DataDescription {
public DataDescription(String tableName,
List<String> partitionNames,
List<String> filePathes,
List<String> filePaths,
List<String> columnNames,
ColumnSeparator columnSeparator,
boolean isNegative,
List<Expr> columnMappingList) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePathes = filePathes;
this.filePaths = filePaths;
this.columnNames = columnNames;
this.columnSeparator = columnSeparator;
this.fileFormat = null;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
}
public DataDescription(String tableName,
List<String> partitionNames,
List<String> filePaths,
List<String> columnNames,
ColumnSeparator columnSeparator,
String fileFormat,
boolean isNegative,
List<Expr> columnMappingList) {
this.tableName = tableName;
this.partitionNames = partitionNames;
this.filePaths = filePaths;
this.columnNames = columnNames;
this.columnSeparator = columnSeparator;
this.fileFormat = fileFormat;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
}
@ -96,14 +117,16 @@ public class DataDescription {
return partitionNames;
}
public List<String> getFilePathes() {
return filePathes;
public List<String> getFilePaths() {
return filePaths;
}
public List<String> getColumnNames() {
return columnNames;
}
public String getFileFormat() { return fileFormat; }
public String getColumnSeparator() {
if (columnSeparator == null) {
return null;
@ -204,10 +227,15 @@ public class DataDescription {
if (columnToFunction.containsKey(column)) {
throw new AnalysisException("Duplicate column mapping: " + column);
}
// we support function and column reference to change a column name
Expr child1 = predicate.getChild(1);
if (!(child1 instanceof FunctionCallExpr)) {
throw new AnalysisException("Mapping function error, function: " + child1.toSql());
if (isPullLoad && child1 instanceof SlotRef) {
// we only support SlotRef in pull load
} else {
throw new AnalysisException("Mapping function error, function: " + child1.toSql());
}
}
if (!child1.supportSerializable()) {
@ -216,6 +244,12 @@ public class DataDescription {
parsedExprMap.put(column, child1);
if (!(child1 instanceof FunctionCallExpr)) {
// only just for pass later check
columnToFunction.put(column, Pair.create("__slot_ref", Lists.newArrayList()));
continue;
}
FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1;
String functionName = functionCallExpr.getFnName().getFunction();
List<Expr> paramExprs = functionCallExpr.getParams().exprs();
@ -413,11 +447,11 @@ public class DataDescription {
ConnectContext.get().getRemoteIP(), tableName);
}
if (filePathes == null || filePathes.isEmpty()) {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
}
for (int i = 0; i < filePathes.size(); ++i) {
filePathes.set(i, filePathes.get(i).trim());
for (int i = 0; i < filePaths.size(); ++i) {
filePaths.set(i, filePaths.get(i).trim());
}
if (columnSeparator != null) {
@ -431,7 +465,7 @@ public class DataDescription {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("DATA INFILE (");
Joiner.on(", ").appendTo(sb, Lists.transform(filePathes, new Function<String, String>() {
Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function<String, String>() {
@Override
public String apply(String s) {
return "'" + s + "'";

View File

@ -17,7 +17,6 @@
package org.apache.doris.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
@ -57,15 +56,17 @@ public class BrokerFileGroup implements Writable {
private long tableId;
private String valueSeparator;
private String lineDelimiter;
// fileFormat may be null, which means format will be decided by file's suffix
// TODO(zc): we need to persist fileFormat, this should be done in next META_VERSION increase
private String fileFormat;
private boolean isNegative;
private List<Long> partitionIds;
private List<String> fileFieldNames;
private List<String> filePathes;
private List<String> filePaths;
// This column need expression to get column
private Map<String, Expr> exprColumnMap;
// Used for recovery from edit log
private BrokerFileGroup() {
}
@ -76,7 +77,7 @@ public class BrokerFileGroup implements Writable {
this.valueSeparator = ColumnSeparator.convertSeparator(table.getColumnSeparator());
this.lineDelimiter = table.getLineDelimiter();
this.isNegative = false;
this.filePathes = table.getPaths();
this.filePaths = table.getPaths();
}
public BrokerFileGroup(DataDescription dataDescription) {
@ -129,10 +130,13 @@ public class BrokerFileGroup implements Writable {
if (lineDelimiter == null) {
lineDelimiter = "\n";
}
fileFormat = dataDescription.getFileFormat();
isNegative = dataDescription.isNegative();
// FilePath
filePathes = dataDescription.getFilePathes();
filePaths = dataDescription.getFilePaths();
}
public long getTableId() {
@ -146,6 +150,7 @@ public class BrokerFileGroup implements Writable {
public String getLineDelimiter() {
return lineDelimiter;
}
public String getFileFormat() { return fileFormat; }
public boolean isNegative() {
return isNegative;
@ -163,8 +168,8 @@ public class BrokerFileGroup implements Writable {
return dataDescription.getPartitionNames();
}
public List<String> getFilePathes() {
return filePathes;
public List<String> getFilePaths() {
return filePaths;
}
public Map<String, Expr> getExprColumnMap() {
@ -202,7 +207,7 @@ public class BrokerFileGroup implements Writable {
.append(",isNegative=").append(isNegative);
sb.append(",fileInfos=[");
int idx = 0;
for (String path : filePathes) {
for (String path : filePaths) {
if (idx++ != 0) {
sb.append(",");
}
@ -243,9 +248,9 @@ public class BrokerFileGroup implements Writable {
Text.writeString(out, name);
}
}
// filePathes
out.writeInt(filePathes.size());
for (String path : filePathes) {
// filePaths
out.writeInt(filePaths.size());
for (String path : filePaths) {
Text.writeString(out, path);
}
// expr column map
@ -290,9 +295,9 @@ public class BrokerFileGroup implements Writable {
// fileInfos
{
int size = in.readInt();
filePathes = Lists.newArrayList();
filePaths = Lists.newArrayList();
for (int i = 0; i < size; ++i) {
filePathes.add(Text.readString(in));
filePaths.add(Text.readString(in));
}
}
// expr column map

View File

@ -62,7 +62,6 @@ import org.apache.doris.common.LoadException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.AsyncDeleteJob.DeleteState;
@ -651,7 +650,7 @@ public class Load {
Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
boolean deleteFlag)
throws DdlException {
Source source = new Source(dataDescription.getFilePathes());
Source source = new Source(dataDescription.getFilePaths());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();

View File

@ -68,7 +68,7 @@ public class BrokerLoadPendingTask extends LoadTask {
List<BrokerFileGroup> fileGroups = entry.getValue();
for (BrokerFileGroup fileGroup : fileGroups) {
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePathes()) {
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
}
fileStatusList.add(fileStatuses);

View File

@ -323,7 +323,9 @@ public class BrokerScanNode extends ScanNode {
params.setProperties(brokerDesc.getProperties());
context.exprMap = fileGroup.getExprColumnMap();
// We must create a new map here, because we will change this map later.
// But fileGroup will be persisted later, so we keep it unchanged.
context.exprMap = Maps.newHashMap(fileGroup.getExprColumnMap());
parseExprMap(context.exprMap);
// Generate expr
@ -488,7 +490,7 @@ public class BrokerScanNode extends ScanNode {
filesAdded = 0;
for (BrokerFileGroup fileGroup : fileGroups) {
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePathes()) {
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
}
fileStatusesList.add(fileStatuses);
@ -553,7 +555,9 @@ public class BrokerScanNode extends ScanNode {
}
}
// If fileFormat is not null, we use fileFormat instead of check file's suffix
private void processFileGroup(
String fileFormat,
TBrokerScanRangeParams params,
List<TBrokerFileStatus> fileStatuses)
throws UserException {
@ -640,7 +644,7 @@ public class BrokerScanNode extends ScanNode {
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
processFileGroup(context.params, fileStatuses);
processFileGroup(context.fileGroup.getFileFormat(), context.params, fileStatuses);
}
if (LOG.isDebugEnabled()) {
for (TScanRangeLocations locations : locationsList) {

View File

@ -114,7 +114,7 @@ public class PullLoadPendingTask extends LoadPendingTask {
List<BrokerFileGroup> fileGroups = entry.getValue();
for (BrokerFileGroup fileGroup : fileGroups) {
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePathes()) {
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseBrokerFile(path, job.getBrokerDesc(), fileStatuses);
}
fileStatusList.add(fileStatuses);

View File

@ -153,6 +153,7 @@ import org.apache.doris.common.util.SqlUtils;
keywordMap.put("follower", new Integer(SqlParserSymbols.KW_FOLLOWER));
keywordMap.put("following", new Integer(SqlParserSymbols.KW_FOLLOWING));
keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR));
keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT));
keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM));
keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND));
keywordMap.put("frontends", new Integer(SqlParserSymbols.KW_FRONTENDS));

View File

@ -72,7 +72,7 @@ public class DataDescriptionTest {
Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString());
Assert.assertEquals("testTable", desc.getTableName());
Assert.assertEquals("[col1, col2]", desc.getColumnNames().toString());
Assert.assertEquals("[abc.txt]", desc.getFilePathes().toString());
Assert.assertEquals("[abc.txt]", desc.getFilePaths().toString());
Assert.assertTrue(desc.isNegative());
Assert.assertNull(desc.getColumnSeparator());