[enhancement](broker-load) support compress type for old broker load, and split compress type from file format (#23882)

This commit is contained in:
Siyang Tang
2023-09-14 21:42:28 +08:00
committed by GitHub
parent 0488c87a38
commit f61e6483bf
13 changed files with 292 additions and 105 deletions

View File

@ -304,6 +304,7 @@ terminal String
KW_COMMITTED,
KW_COMPACT,
KW_COMPLETE,
KW_COMPRESS_TYPE,
KW_CONFIG,
KW_CONNECTION,
KW_CONNECTION_ID,
@ -904,8 +905,8 @@ nonterminal GroupByClause group_by_clause, grouping_elements;
//
nonterminal String keyword, ident, ident_or_text, variable_name,
charset_name_or_default, old_or_new_charset_name_or_default, opt_collate,
collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, time_unit,
literal_or_ident;
collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, opt_file_compress_type,
time_unit, literal_or_ident;
nonterminal PassVar text_or_password;
// sync job
@ -2433,6 +2434,7 @@ data_desc ::=
opt_field_term:colSep
opt_line_term:lineDelimiter
opt_file_format:fileFormat
opt_file_compress_type:fileCompressType
opt_col_list:colList
opt_columns_from_path:columnsFromPath
opt_col_mapping_list:colMappingList
@ -2442,7 +2444,7 @@ data_desc ::=
sequence_col_clause:sequenceColName
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, lineDelimiter, fileFormat,
RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, lineDelimiter, fileFormat, fileCompressType,
columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
@ -2549,6 +2551,13 @@ opt_file_format ::=
{: RESULT = format; :}
;
opt_file_compress_type ::=
/* Empty */
{: RESULT = null; :}
| KW_COMPRESS_TYPE KW_AS ident_or_text:compress_type
{: RESULT = compress_type; :}
;
opt_columns_from_path ::=
/* Empty */
{: RESULT = null; :}

View File

@ -28,6 +28,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
@ -173,22 +174,22 @@ public class DataDescription implements InsertStmt.DataDesc {
}
public DataDescription(String tableName,
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
Separator columnSeparator,
String fileFormat,
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
Expr fileFilterExpr,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition,
String sequenceColName,
Map<String, String> properties) {
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
Separator columnSeparator,
String fileFormat,
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
Expr fileFilterExpr,
Expr whereExpr,
LoadTask.MergeType mergeType,
Expr deleteCondition,
String sequenceColName,
Map<String, String> properties) {
this(tableName, partitionNames, filePaths, columns, columnSeparator, null,
fileFormat, columnsFromPath, isNegative, columnMappingList, fileFilterExpr, whereExpr,
fileFormat, null, columnsFromPath, isNegative, columnMappingList, fileFilterExpr, whereExpr,
mergeType, deleteCondition, sequenceColName, properties);
}
@ -199,6 +200,7 @@ public class DataDescription implements InsertStmt.DataDesc {
Separator columnSeparator,
Separator lineDelimiter,
String fileFormat,
String compressType,
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
@ -215,6 +217,7 @@ public class DataDescription implements InsertStmt.DataDesc {
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.fileFormat = fileFormat;
this.compressType = Util.getFileCompressType(compressType);
this.columnsFromPath = columnsFromPath;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
@ -362,8 +365,8 @@ public class DataDescription implements InsertStmt.DataDesc {
}
public static void validateMappingFunction(String functionName, List<String> args,
Map<String, String> columnNameMap,
Column mappingColumn, boolean isHadoopLoad) throws AnalysisException {
Map<String, String> columnNameMap,
Column mappingColumn, boolean isHadoopLoad) throws AnalysisException {
if (functionName.equalsIgnoreCase("alignment_timestamp")) {
validateAlignmentTimestamp(args, columnNameMap);
} else if (functionName.equalsIgnoreCase("strftime")) {
@ -1050,6 +1053,13 @@ public class DataDescription implements InsertStmt.DataDesc {
if (isAnalyzed) {
return;
}
checkLoadPriv(fullDbName);
checkMergeType();
analyzeWithoutCheckPriv(fullDbName);
isAnalyzed = true;
}
private void checkMergeType() throws AnalysisException {
if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE.");
}
@ -1059,24 +1069,32 @@ public class DataDescription implements InsertStmt.DataDesc {
if (mergeType != LoadTask.MergeType.APPEND && isNegative) {
throw new AnalysisException("not support MERGE or DELETE with NEGATIVE.");
}
checkLoadPriv(fullDbName);
analyzeWithoutCheckPriv(fullDbName);
if (isNegative && mergeType != LoadTask.MergeType.APPEND) {
throw new AnalysisException("Negative is only used when merge type is append.");
}
isAnalyzed = true;
}
public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException {
analyzeFilePaths();
analyzeLoadAttributes();
analyzeColumns();
analyzeMultiLoadColumns();
analyzeSequenceCol(fullDbName);
if (properties != null) {
analyzeProperties();
}
}
private void analyzeFilePaths() throws AnalysisException {
if (!isLoadFromTable()) {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
}
for (int i = 0; i < filePaths.size(); ++i) {
filePaths.set(i, filePaths.get(i).trim());
}
filePaths.replaceAll(String::trim);
}
}
private void analyzeLoadAttributes() throws AnalysisException {
if (columnSeparator != null) {
columnSeparator.analyze();
}
@ -1089,12 +1107,18 @@ public class DataDescription implements InsertStmt.DataDesc {
partitionNames.analyze(null);
}
analyzeColumns();
analyzeMultiLoadColumns();
analyzeSequenceCol(fullDbName);
if (properties != null) {
analyzeProperties();
// file format
// note(tsy): for historical reason, file format here must be string type rather than TFileFormatType
if (fileFormat != null) {
if (!fileFormat.equalsIgnoreCase("parquet")
&& !fileFormat.equalsIgnoreCase(FeConstants.csv)
&& !fileFormat.equalsIgnoreCase("orc")
&& !fileFormat.equalsIgnoreCase("json")
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
&& !fileFormat.equalsIgnoreCase("hive_text")) {
throw new AnalysisException("File Format Type " + fileFormat + " is invalid.");
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
@ -50,6 +51,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
@ -534,29 +536,38 @@ public class Util {
@NotNull
public static TFileFormatType getFileFormatType(String path) {
public static TFileFormatType getFileFormatTypeFromPath(String path) {
String lowerCasePath = path.toLowerCase();
if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) {
if (lowerCasePath.contains(".parquet") || lowerCasePath.contains(".parq")) {
return TFileFormatType.FORMAT_PARQUET;
} else if (lowerCasePath.endsWith(".gz")) {
return TFileFormatType.FORMAT_CSV_GZ;
} else if (lowerCasePath.endsWith(".bz2")) {
return TFileFormatType.FORMAT_CSV_BZ2;
} else if (lowerCasePath.endsWith(".lz4")) {
return TFileFormatType.FORMAT_CSV_LZ4FRAME;
} else if (lowerCasePath.endsWith(".lzo")) {
return TFileFormatType.FORMAT_CSV_LZOP;
} else if (lowerCasePath.endsWith(".lzo_deflate")) {
return TFileFormatType.FORMAT_CSV_LZO;
} else if (lowerCasePath.endsWith(".deflate")) {
return TFileFormatType.FORMAT_CSV_DEFLATE;
} else if (lowerCasePath.endsWith(".snappy")) {
return TFileFormatType.FORMAT_CSV_SNAPPYBLOCK;
} else if (lowerCasePath.contains(".orc")) {
return TFileFormatType.FORMAT_ORC;
} else if (lowerCasePath.contains(".json")) {
return TFileFormatType.FORMAT_JSON;
} else {
return TFileFormatType.FORMAT_CSV_PLAIN;
}
}
public static TFileFormatType getFileFormatTypeFromName(String formatName) {
String lowerFileFormat = Objects.requireNonNull(formatName).toLowerCase();
if (lowerFileFormat.equals("parquet")) {
return TFileFormatType.FORMAT_PARQUET;
} else if (lowerFileFormat.equals("orc")) {
return TFileFormatType.FORMAT_ORC;
} else if (lowerFileFormat.equals("json")) {
return TFileFormatType.FORMAT_JSON;
// csv/csv_with_name/csv_with_names_and_types treat as csv format
} else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names)
|| lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
// TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
|| lowerFileFormat.equals(FeConstants.text)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else {
return TFileFormatType.FORMAT_UNKNOWN;
}
}
/**
* Infer {@link TFileCompressType} from file name.
*
@ -585,6 +596,9 @@ public class Util {
}
public static TFileCompressType getFileCompressType(String compressType) {
if (Strings.isNullOrEmpty(compressType)) {
return TFileCompressType.UNKNOWN;
}
final String upperCaseType = compressType.toUpperCase();
return TFileCompressType.valueOf(upperCaseType);
}

View File

@ -34,7 +34,6 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@ -214,16 +213,6 @@ public class BrokerFileGroup implements Writable {
escape = dataDescription.getEscape();
fileFormat = dataDescription.getFileFormat();
if (fileFormat != null) {
if (!fileFormat.equalsIgnoreCase("parquet") && !fileFormat.equalsIgnoreCase(FeConstants.csv)
&& !fileFormat.equalsIgnoreCase("orc")
&& !fileFormat.equalsIgnoreCase("json")
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
&& !fileFormat.equalsIgnoreCase("hive_text")) {
throw new DdlException("File Format Type " + fileFormat + " is invalid.");
}
}
columnSeparator = dataDescription.getColumnSeparator();
if (columnSeparator == null) {
if (fileFormat != null && fileFormat.equalsIgnoreCase("hive_text")) {

View File

@ -24,7 +24,6 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
@ -207,6 +206,9 @@ public class FileGroupInfo {
// header_type
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
context.params.setFormatType(formatType);
context.params.setCompressType(
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path)
);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
// Assign scan range locations only for broker load.
@ -293,26 +295,15 @@ public class FileGroupInfo {
}
private TFileFormatType formatType(String fileFormat, String path) throws UserException {
if (fileFormat != null) {
if (fileFormat.equalsIgnoreCase("parquet")) {
return TFileFormatType.FORMAT_PARQUET;
} else if (fileFormat.equalsIgnoreCase("orc")) {
return TFileFormatType.FORMAT_ORC;
} else if (fileFormat.equalsIgnoreCase("json")) {
return TFileFormatType.FORMAT_JSON;
// csv/csv_with_name/csv_with_names_and_types treat as csv format
} else if (fileFormat.equalsIgnoreCase(FeConstants.csv) || fileFormat.toLowerCase()
.equals(FeConstants.csv_with_names) || fileFormat.toLowerCase()
.equals(FeConstants.csv_with_names_and_types)
// TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
|| fileFormat.equalsIgnoreCase(FeConstants.text)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else {
throw new UserException("Not supported file format: " + fileFormat);
}
if (fileFormat == null) {
// get file format by the file path
return Util.getFileFormatTypeFromPath(path);
}
return Util.getFileFormatType(path);
TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat);
if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
throw new UserException("Not supported file format: " + fileFormat);
}
return formatType;
}
private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,

View File

@ -88,7 +88,7 @@ public class LoadScanProvider {
ctx.timezone = analyzer.getTimezone();
TFileScanRangeParams params = new TFileScanRangeParams();
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat(), ""));
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat()));
params.setCompressType(fileGroupInfo.getFileGroup().getCompressType());
params.setStrictMode(fileGroupInfo.isStrictMode());
if (fileGroupInfo.getFileGroup().getFileFormat() != null
@ -211,7 +211,7 @@ public class LoadScanProvider {
List<Integer> srcSlotIds = Lists.newArrayList();
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(),
context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds,
formatType(context.fileGroup.getFileFormat(), ""), fileGroupInfo.getHiddenColumns(),
formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(),
fileGroupInfo.isPartialUpdate());
int columnCountFromPath = 0;
@ -242,28 +242,16 @@ public class LoadScanProvider {
.equalsIgnoreCase(Column.DELETE_SIGN);
}
private TFileFormatType formatType(String fileFormat, String path) throws UserException {
if (fileFormat != null) {
String lowerFileFormat = fileFormat.toLowerCase();
if (lowerFileFormat.equals("parquet")) {
return TFileFormatType.FORMAT_PARQUET;
} else if (lowerFileFormat.equals("orc")) {
return TFileFormatType.FORMAT_ORC;
} else if (lowerFileFormat.equals("json")) {
return TFileFormatType.FORMAT_JSON;
// csv/csv_with_name/csv_with_names_and_types treat as csv format
} else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names)
|| lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
// TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
|| lowerFileFormat.equals(FeConstants.text)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else {
throw new UserException("Not supported file format: " + fileFormat);
}
} else {
// get file format by the suffix of file
return Util.getFileFormatType(path);
private TFileFormatType formatType(String fileFormat) throws UserException {
if (fileFormat == null) {
// get file format by the file path
return TFileFormatType.FORMAT_CSV_PLAIN;
}
TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat);
if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
throw new UserException("Not supported file format: " + fileFormat);
}
return formatType;
}
public TableIf getTargetTable() {

View File

@ -235,6 +235,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR));
keywordMap.put("force", new Integer(SqlParserSymbols.KW_FORCE));
keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT));
keywordMap.put("compress_type", new Integer(SqlParserSymbols.KW_COMPRESS_TYPE));
keywordMap.put("free", new Integer(SqlParserSymbols.KW_FREE));
keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM));
keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND));