[feature](tvf) Support compress file for tvf hdfs() and s3() (#19530)

We can support this by add a new properties for tvf, like :

`select * from hdfs("uri" = "xxx", ..., "compress_type" = "lz4", ...)`

User can:

Specify compression explicitly by setting `"compression" = "xxx"`.
Doris can infer the compression type by the suffix of file name(e.g. `file1.gz`)
Currently, we only support reading compress file in `csv` format, and on BE side, we already support.
All need to do is to analyze the `"compress_type"` on FE side and pass it to BE.
This commit is contained in:
Weijie Guo
2023-05-16 08:50:43 +08:00
committed by GitHub
parent 8284c342cb
commit 9535ed01aa
11 changed files with 93 additions and 6 deletions

View File

@ -66,6 +66,7 @@ File format parameters:
- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`: (optional) default `,`.
- `line_delimiter`: (optional) default `\n`.
- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`.
The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md)

View File

@ -69,6 +69,7 @@ file format parameter:
- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`: (optional) default `,`.
- `line_delimiter`: (optional) default `\n`.
- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`.
The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md)

View File

@ -67,6 +67,7 @@ hdfs(
- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`:(选填) 列分割符, 默认为`,`
- `line_delimiter`:(选填) 行分割符,默认为`\n`
- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。
下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md)

View File

@ -71,6 +71,7 @@ S3 tvf中的每一个参数都是一个 `"key"="value"` 对。
- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`:(选填) 列分割符, 默认为`,`
- `line_delimiter`:(选填) 行分割符,默认为`\n`
- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。
下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md)

View File

@ -552,8 +552,13 @@ public class Util {
}
}
/**
* Infer {@link TFileCompressType} from file name.
*
* @param path of file to be inferred.
*/
@NotNull
public static TFileCompressType getFileCompressType(String path) {
public static TFileCompressType inferFileCompressTypeByPath(String path) {
String lowerCasePath = path.toLowerCase();
if (lowerCasePath.endsWith(".gz")) {
return TFileCompressType.GZ;
@ -572,6 +577,20 @@ public class Util {
}
}
public static TFileCompressType getFileCompressType(String compressType) {
final String upperCaseType = compressType.toUpperCase();
return TFileCompressType.valueOf(upperCaseType);
}
/**
* Pass through the compressType if it is not {@link TFileCompressType#UNKNOWN}. Otherwise, return the
* inferred type from path.
*/
public static TFileCompressType getOrInferCompressType(TFileCompressType compressType, String path) {
return compressType == TFileCompressType.UNKNOWN
? inferFileCompressTypeByPath(path.toLowerCase()) : compressType;
}
public static boolean isCsvFormat(TFileFormatType fileFormatType) {
return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2
|| fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE

View File

@ -334,7 +334,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.getFileCompressType(fileSplit.getPath().toString());
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
}
protected TFileAttributes getFileAttributes() throws UserException {

View File

@ -23,12 +23,14 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@ -83,6 +85,12 @@ public class TVFScanNode extends FileQueryScanNode {
return tableValuedFunction.getTFileFormatType();
}
@Override
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
TFileCompressType fileCompressType = tableValuedFunction.getTFileCompressType();
return Util.getOrInferCompressType(fileCompressType, fileSplit.getPath().toString());
}
@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return tableValuedFunction.getTFileType();

View File

@ -34,6 +34,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
@ -48,6 +49,7 @@ import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
@ -95,6 +97,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
protected static final String SKIP_LINES = "skip_lines";
protected static final String CSV_SCHEMA = "csv_schema";
protected static final String COMPRESS_TYPE = "compress_type";
// decimal(p,s)
private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
// datetime(p)
@ -124,6 +127,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
protected Map<String, String> locationProperties;
private TFileFormatType fileFormatType;
private TFileCompressType compressionType;
private String headerType = "";
private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
@ -147,6 +151,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
return fileFormatType;
}
public TFileCompressType getTFileCompressType() {
return compressionType;
}
public Map<String, String> getLocationProperties() {
return locationProperties;
}
@ -212,7 +220,11 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue();
trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue();
skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue();
try {
compressionType = Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN"));
} catch (IllegalArgumentException e) {
throw new AnalysisException("Compress type : " + validParams.get(COMPRESS_TYPE) + " is not supported.");
}
if (formatString.equals("csv") || formatString.equals("csv_with_names")
|| formatString.equals("csv_with_names_and_types")) {
parseCsvSchema(csvSchema, validParams);
@ -451,6 +463,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
throw new AnalysisException("Can not get first file, please check uri.");
}
fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath()));
// set TFileRangeDesc
TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
fileRangeDesc.setPath(firstFile.getPath());

View File

@ -151,6 +151,30 @@
7 [1,2,3,4,5] \N \N \N \N \N \N \N \N \N
8 [1,2,3,4,5] \N \N \N \N \N []] ]] [[] [[
-- !csv_with_compress_type --
0 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
1 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
2 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
3 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
4 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
5 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
6 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
7 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
8 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
9 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
-- !csv_infer_compress_type --
0 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
1 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
2 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
3 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
4 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
5 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
6 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
7 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
8 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
9 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59
-- !csv_names --
1 alice 18
2 bob 20

View File

@ -27,7 +27,7 @@ suite("test_hdfs_tvf") {
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
// test csv foramt
// test csv format
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types.csv"
format = "csv"
qt_csv_all_types """ select * from HDFS(
@ -51,7 +51,7 @@ suite("test_hdfs_tvf") {
"uri" = "${uri}",
"fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "csv",
"format" = "${format}",
"column_separator" = "|") order by c1; """
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_normal.csv"
@ -59,9 +59,28 @@ suite("test_hdfs_tvf") {
qt_csv_array_normal """ select * from HDFS("uri" = "${uri}",
"fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "csv",
"format" = "${format}",
"column_separator" = "|") order by c1; """
// test csv format with compress type
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types_compressed.csv.gz"
format = "csv"
qt_csv_with_compress_type """ select * from HDFS(
"uri" = "${uri}",
"fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"compress_type" = "GZ") order by c1; """
// test csv format infer compress type
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types_compressed.csv.gz"
format = "csv"
qt_csv_infer_compress_type """ select * from HDFS(
"uri" = "${uri}",
"fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}") order by c1; """
// test csv_with_names file format
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/student_with_names.csv"
format = "csv_with_names"