From 9535ed01aa0436f5fa658de000c85965f12f38fd Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Tue, 16 May 2023 08:50:43 +0800 Subject: [PATCH] [feature](tvf) Support compress file for tvf hdfs() and s3() (#19530) We can support this by add a new properties for tvf, like : `select * from hdfs("uri" = "xxx", ..., "compress_type" = "lz4", ...)` User can: Specify compression explicitly by setting `"compression" = "xxx"`. Doris can infer the compression type by the suffix of file name(e.g. `file1.gz`) Currently, we only support reading compress file in `csv` format, and on BE side, we already support. All need to do is to analyze the `"compress_type"` on FE side and pass it to BE. --- .../all_types_compressed.csv.gz | Bin 0 -> 127 bytes .../sql-functions/table-functions/hdfs.md | 1 + .../sql-functions/table-functions/s3.md | 1 + .../sql-functions/table-functions/hdfs.md | 1 + .../sql-functions/table-functions/s3.md | 1 + .../org/apache/doris/common/util/Util.java | 21 ++++++++++++++- .../planner/external/FileQueryScanNode.java | 2 +- .../doris/planner/external/TVFScanNode.java | 8 ++++++ .../ExternalFileTableValuedFunction.java | 15 ++++++++++- .../table_valued_function/test_hdfs_tvf.out | 24 +++++++++++++++++ .../test_hdfs_tvf.groovy | 25 +++++++++++++++--- 11 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/csv_format_test/all_types_compressed.csv.gz diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/csv_format_test/all_types_compressed.csv.gz b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/csv_format_test/all_types_compressed.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..6e61b2ab878ac259a78ff98e6a2619cae8e5476b GIT binary patch literal 127 zcmb2|=HR&YH713DIWZ?EzNE4swKzUGKewPLwYWGnMK8IyjA8AWoq`Pp0?rrbXlXdS zskpv<_r{&S^BWpFn3nu(T=YEQ14oC)qzS5vJsjn=xBWd(dH1jMnVHdF-j=6dU3M*L j?v%q4?H^^-3TB*06ly;yv#el-SEAh1H5Ii-nHU%VdqO+? literal 0 HcmV?d00001 diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md b/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md index 578313f297..7e259bc639 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md @@ -66,6 +66,7 @@ File format parameters: - `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` - `column_separator`: (optional) default `,`. - `line_delimiter`: (optional) default `\n`. +- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`. The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md) diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md index 22dde6e210..5026946be7 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md @@ -69,6 +69,7 @@ file format parameter: - `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` - `column_separator`: (optional) default `,`. - `line_delimiter`: (optional) default `\n`. +- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`. The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md index 0384648e8c..65cb7f3cde 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md @@ -67,6 +67,7 @@ hdfs( - `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` - `column_separator`:(选填) 列分割符, 默认为`,`。 - `line_delimiter`:(选填) 行分割符,默认为`\n`。 +- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。 下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md) diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md index b7c735b586..3af41d4b4c 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md @@ -71,6 +71,7 @@ S3 tvf中的每一个参数都是一个 `"key"="value"` 对。 - `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` - `column_separator`:(选填) 列分割符, 默认为`,`。 - `line_delimiter`:(选填) 行分割符,默认为`\n`。 +- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。 下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 0fbc19596d..9e71c8c952 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -552,8 +552,13 @@ public class Util { } } + /** + * Infer {@link TFileCompressType} from file name. + * + * @param path of file to be inferred. + */ @NotNull - public static TFileCompressType getFileCompressType(String path) { + public static TFileCompressType inferFileCompressTypeByPath(String path) { String lowerCasePath = path.toLowerCase(); if (lowerCasePath.endsWith(".gz")) { return TFileCompressType.GZ; @@ -572,6 +577,20 @@ public class Util { } } + public static TFileCompressType getFileCompressType(String compressType) { + final String upperCaseType = compressType.toUpperCase(); + return TFileCompressType.valueOf(upperCaseType); + } + + /** + * Pass through the compressType if it is not {@link TFileCompressType#UNKNOWN}. Otherwise, return the + * inferred type from path. + */ + public static TFileCompressType getOrInferCompressType(TFileCompressType compressType, String path) { + return compressType == TFileCompressType.UNKNOWN + ? inferFileCompressTypeByPath(path.toLowerCase()) : compressType; + } + public static boolean isCsvFormat(TFileFormatType fileFormatType) { return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 3b5b1f8a3d..edd63e2bec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -334,7 +334,7 @@ public abstract class FileQueryScanNode extends FileScanNode { } protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { - return Util.getFileCompressType(fileSplit.getPath().toString()); + return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString()); } protected TFileAttributes getFileAttributes() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index 67008f5d7a..9ec6c6c618 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -23,12 +23,14 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -83,6 +85,12 @@ public class TVFScanNode extends FileQueryScanNode { return tableValuedFunction.getTFileFormatType(); } + @Override + protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { + TFileCompressType fileCompressType = tableValuedFunction.getTFileCompressType(); + return Util.getOrInferCompressType(fileCompressType, fileSplit.getPath().toString()); + } + @Override public TFileType getLocationType() throws DdlException, MetaNotFoundException { return tableValuedFunction.getTFileType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 74ee6a855d..5d3aef2213 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -34,6 +34,7 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; @@ -48,6 +49,7 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanRange; @@ -95,6 +97,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes"; protected static final String SKIP_LINES = "skip_lines"; protected static final String CSV_SCHEMA = "csv_schema"; + protected static final String COMPRESS_TYPE = "compress_type"; // decimal(p,s) private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); // datetime(p) @@ -124,6 +127,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected Map locationProperties; private TFileFormatType fileFormatType; + private TFileCompressType compressionType; private String headerType = ""; private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; @@ -147,6 +151,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return fileFormatType; } + public TFileCompressType getTFileCompressType() { + return compressionType; + } + public Map getLocationProperties() { return locationProperties; } @@ -212,7 +220,11 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue(); trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue(); skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue(); - + try { + compressionType = Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN")); + } catch (IllegalArgumentException e) { + throw new AnalysisException("Compress type : " + validParams.get(COMPRESS_TYPE) + " is not supported."); + } if (formatString.equals("csv") || formatString.equals("csv_with_names") || formatString.equals("csv_with_names_and_types")) { parseCsvSchema(csvSchema, validParams); @@ -451,6 +463,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio throw new AnalysisException("Can not get first file, please check uri."); } + fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); // set TFileRangeDesc TFileRangeDesc fileRangeDesc = new TFileRangeDesc(); fileRangeDesc.setPath(firstFile.getPath()); diff --git a/regression-test/data/correctness_p0/table_valued_function/test_hdfs_tvf.out b/regression-test/data/correctness_p0/table_valued_function/test_hdfs_tvf.out index b8bcccb3cd..6c269fc93c 100644 --- a/regression-test/data/correctness_p0/table_valued_function/test_hdfs_tvf.out +++ b/regression-test/data/correctness_p0/table_valued_function/test_hdfs_tvf.out @@ -151,6 +151,30 @@ 7 [1,2,3,4,5] \N \N \N \N \N \N \N \N \N 8 [1,2,3,4,5] \N \N \N \N \N []] ]] [[] [[ +-- !csv_with_compress_type -- +0 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +1 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +2 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +3 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +4 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +5 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +6 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +7 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +8 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +9 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 + +-- !csv_infer_compress_type -- +0 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +1 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +2 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +3 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +4 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +5 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +6 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +7 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +8 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 +9 2 3 4 5 6.6 7.7 8.8 abc def ghiaaaaaa 2020-10-10 2020-10-10 11:12:59 + -- !csv_names -- 1 alice 18 2 bob 20 diff --git a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy index 5a283eafa2..162af3c722 100644 --- a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy +++ b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy @@ -27,7 +27,7 @@ suite("test_hdfs_tvf") { if (enabled != null && enabled.equalsIgnoreCase("true")) { try { - // test csv foramt + // test csv format uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types.csv" format = "csv" qt_csv_all_types """ select * from HDFS( @@ -51,7 +51,7 @@ suite("test_hdfs_tvf") { "uri" = "${uri}", "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", - "format" = "csv", + "format" = "${format}", "column_separator" = "|") order by c1; """ uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_normal.csv" @@ -59,9 +59,28 @@ suite("test_hdfs_tvf") { qt_csv_array_normal """ select * from HDFS("uri" = "${uri}", "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", - "format" = "csv", + "format" = "${format}", "column_separator" = "|") order by c1; """ + // test csv format with compress type + uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types_compressed.csv.gz" + format = "csv" + qt_csv_with_compress_type """ select * from HDFS( + "uri" = "${uri}", + "fs.defaultFS"= "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}", + "compress_type" = "GZ") order by c1; """ + + // test csv format infer compress type + uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types_compressed.csv.gz" + format = "csv" + qt_csv_infer_compress_type """ select * from HDFS( + "uri" = "${uri}", + "fs.defaultFS"= "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}") order by c1; """ + // test csv_with_names file format uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/student_with_names.csv" format = "csv_with_names"