[feature](load) support line delimiter for old broker load (#22030)

This commit is contained in:
Siyang Tang
2023-07-21 19:31:19 +08:00
committed by GitHub
parent b76d0d84ac
commit e489b60ea3
9 changed files with 123 additions and 122 deletions

View File

@ -2397,6 +2397,7 @@ data_desc ::=
KW_INTO KW_TABLE ident:tableName
opt_partition_names:partitionNames
opt_field_term:colSep
opt_line_term:lineDelimiter
opt_file_format:fileFormat
opt_col_list:colList
opt_columns_from_path:columnsFromPath
@ -2407,7 +2408,7 @@ data_desc ::=
sequence_col_clause:sequenceColName
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, lineDelimiter, fileFormat,
columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName

View File

@ -168,11 +168,32 @@ public class DataDescription implements InsertStmt.DataDesc {
isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null, null);
}
public DataDescription(String tableName,
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
Separator columnSeparator,
String fileFormat,
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
Expr fileFilterExpr,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition,
String sequenceColName,
Map<String, String> properties) {
this(tableName, partitionNames, filePaths, columns, columnSeparator, null,
fileFormat, columnsFromPath, isNegative, columnMappingList, fileFilterExpr, whereExpr,
mergeType, deleteCondition, sequenceColName, properties);
}
public DataDescription(String tableName,
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
Separator columnSeparator,
Separator lineDelimiter,
String fileFormat,
List<String> columnsFromPath,
boolean isNegative,
@ -188,6 +209,7 @@ public class DataDescription implements InsertStmt.DataDesc {
this.filePaths = filePaths;
this.fileFieldNames = columns;
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.fileFormat = fileFormat;
this.columnsFromPath = columnsFromPath;
this.isNegative = isNegative;
@ -597,6 +619,10 @@ public class DataDescription implements InsertStmt.DataDesc {
return lineDelimiter.getSeparator();
}
public Separator getLineDelimiterObj() {
return lineDelimiter;
}
public void setLineDelimiter(Separator lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}

View File

@ -154,16 +154,10 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
params.put(ExternalFileTableValuedFunction.FORMAT, format);
if (isCsvFormat(format)) {
final Separator separator = dataDescription.getColumnSeparatorObj();
if (separator != null) {
try {
separator.analyze();
} catch (AnalysisException e) {
throw new DdlException("failed to create s3 tvf ref", e);
}
params.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, dataDescription.getColumnSeparator());
}
parseSeparator(dataDescription.getColumnSeparatorObj(), params);
parseSeparator(dataDescription.getLineDelimiterObj(), params);
}
List<String> columnsFromPath = dataDescription.getColumnsFromPath();
@ -187,6 +181,18 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
}
}
private static void parseSeparator(Separator separator, Map<String, String> tvfParams) throws DdlException {
if (separator == null) {
return;
}
try {
separator.analyze();
} catch (AnalysisException e) {
throw new DdlException(String.format("failed to parse separator:%s", separator), e);
}
tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, separator.getSeparator());
}
private static boolean isCsvFormat(String format) {
return Strings.isNullOrEmpty(format) || StringUtils.equalsIgnoreCase(format, FORMAT_CSV);
}