[fix](Tvf) return empty set when tvf queries an empty file or an error uri (#25280)
### Before: return errors when tvf queries an empty file or an error uri: 1. get parsed schema failed, empty csv file 2. Can not get first file, please check uri. ### Now: we just return empty set when tvf queries an empty file or an error uri. ```sql mysql> select * from s3( "uri" = "https://error_uri/exp_1.csv", "s3.access_key"= "xx", "s3.secret_key" = "yy", "format" = "csv") limit 10; Empty set (1.29 sec) ```
This commit is contained in:
@ -193,3 +193,9 @@ FROM s3(
|
||||
"use_path_style" = "true");
|
||||
```
|
||||
|
||||
|
||||
### Note
|
||||
|
||||
1. If the URI specified by the `S3 / HDFS` TVF is not matched with the file, or all the matched files are empty files, then the` S3 / HDFS` TVF will return to the empty result set. In this case, using the `DESC FUNCTION` to view the schema of this file, you will get a dummy column` __dummy_col`, which can be ignored.
|
||||
|
||||
2. If the format of the TVF is specified to `CSV`, and the read file is not a empty file but the first line of this file is empty, then it will prompt the error `The first line is empty, can not parse column numbers`. This is because the schema cannot be parsed from the first line of the file
|
||||
@ -51,7 +51,7 @@ hdfs(
|
||||
|
||||
Related parameters for accessing hdfs:
|
||||
|
||||
- `uri`: (required) hdfs uri.
|
||||
- `uri`: (required) hdfs uri. If the uri path does not exist or the files are empty files, hdfs tvf will return an empty result set.
|
||||
- `fs.defaultFS`: (required)
|
||||
- `hadoop.username`: (required) Can be any string, but cannot be empty.
|
||||
- `hadoop.security.authentication`: (optional)
|
||||
|
||||
@ -63,6 +63,7 @@ Related parameters for accessing S3:
|
||||
> Note: URI currently supports three SCHEMA: http://, https:// and s3://.
|
||||
> 1. If you use http:// or https://, you will decide whether to use the 'path style' to access s3 based on the 'use_path_style' parameter
|
||||
> 2. If you use s3://, you will use the "virtual-hosted style' to access the s3, 'use_path_style' parameter is invalid.
|
||||
> 3. If the uri path does not exist or the files are empty files, s3 tvf will return an empty result set.
|
||||
>
|
||||
> For detailed use cases, you can refer to Best Practice at the bottom.
|
||||
|
||||
|
||||
@ -193,3 +193,9 @@ FROM s3(
|
||||
"use_path_style" = "true");
|
||||
```
|
||||
|
||||
### 注意事项
|
||||
|
||||
1. 如果 `S3 / hdfs` tvf指定的uri匹配不到文件,或者匹配到的所有文件都是空文件,那么 `S3 / hdfs` tvf将会返回空结果集。在这种情况下使用`DESC FUNCTION`查看这个文件的Schema,会得到一列虚假的列`__dummy_col`,可忽略这一列。
|
||||
|
||||
2. 如果指定tvf的format为csv,所读文件不为空文件但文件第一行为空,则会提示错误`The first line is empty, can not parse column numbers`, 这因为无法通过该文件的第一行解析出schema。
|
||||
|
||||
|
||||
@ -53,7 +53,7 @@ hdfs(
|
||||
**参数说明**
|
||||
|
||||
访问hdfs相关参数:
|
||||
- `uri`:(必填) 访问hdfs的uri。
|
||||
- `uri`:(必填) 访问hdfs的uri。如果uri路径不存在或文件都是空文件,hdfs tvf将返回空集合。
|
||||
- `fs.defaultFS`:(必填)
|
||||
- `hadoop.username`: (必填)可以是任意字符串,但不能为空
|
||||
- `hadoop.security.authentication`:(选填)
|
||||
|
||||
@ -66,6 +66,7 @@ S3 tvf中的每一个参数都是一个 `"key"="value"` 对。
|
||||
> 注意:uri目前支持三种schema:http://, https:// 和 s3://
|
||||
> 1. 如果使用http://或https://, 则会根据 'use_path_style' 参数来决定是否使用'path style'方式访问s3
|
||||
> 2. 如果使用s3://, 则都使用 'virtual-hosted style' 方式访问s3, 'use_path_style'参数无效。
|
||||
> 3. 如果uri路径不存在或文件都是空文件,s3 tvf将返回空集合
|
||||
>
|
||||
> 详细使用案例可以参考最下方 Best Practice。
|
||||
|
||||
|
||||
@ -344,22 +344,27 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
|
||||
try {
|
||||
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
|
||||
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
|
||||
.fetchTableStructureAsync(address, request);
|
||||
InternalService.PFetchTableSchemaResult result = null;
|
||||
|
||||
InternalService.PFetchTableSchemaResult result = future.get();
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
String errMsg;
|
||||
if (code != TStatusCode.OK) {
|
||||
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
|
||||
errMsg = result.getStatus().getErrorMsgsList().get(0);
|
||||
} else {
|
||||
errMsg = "fetchTableStructureAsync failed. backend address: "
|
||||
+ address.getHostname() + ":" + address.getPort();
|
||||
// `request == null` means we don't need to get schemas from BE,
|
||||
// and we fill a dummy col for this table.
|
||||
if (request != null) {
|
||||
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
|
||||
.fetchTableStructureAsync(address, request);
|
||||
|
||||
result = future.get();
|
||||
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
|
||||
String errMsg;
|
||||
if (code != TStatusCode.OK) {
|
||||
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
|
||||
errMsg = result.getStatus().getErrorMsgsList().get(0);
|
||||
} else {
|
||||
errMsg = "fetchTableStructureAsync failed. backend address: "
|
||||
+ address.getHostname() + ":" + address.getPort();
|
||||
}
|
||||
throw new AnalysisException(errMsg);
|
||||
}
|
||||
throw new AnalysisException(errMsg);
|
||||
}
|
||||
|
||||
fillColumns(result);
|
||||
} catch (RpcException e) {
|
||||
throw new AnalysisException("fetchTableStructureResult rpc exception", e);
|
||||
@ -431,10 +436,12 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
return Pair.of(type, parsedNodes);
|
||||
}
|
||||
|
||||
private void fillColumns(InternalService.PFetchTableSchemaResult result)
|
||||
throws AnalysisException {
|
||||
if (result.getColumnNums() == 0) {
|
||||
throw new AnalysisException("The amount of column is 0");
|
||||
private void fillColumns(InternalService.PFetchTableSchemaResult result) {
|
||||
// `result == null` means we don't need to get schemas from BE,
|
||||
// and we fill a dummy col for this table.
|
||||
if (result == null) {
|
||||
columns.add(new Column("__dummy_col", ScalarType.createStringType(), true));
|
||||
return;
|
||||
}
|
||||
// add fetched file columns
|
||||
for (int idx = 0; idx < result.getColumnNums(); ++idx) {
|
||||
@ -450,7 +457,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
}
|
||||
}
|
||||
|
||||
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException {
|
||||
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TException {
|
||||
// set TFileScanRangeParams
|
||||
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
|
||||
fileScanRangeParams.setFormatType(fileFormatType);
|
||||
@ -475,14 +482,19 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
// get first file, used to parse table schema
|
||||
TBrokerFileStatus firstFile = null;
|
||||
for (TBrokerFileStatus fileStatus : fileStatuses) {
|
||||
if (fileStatus.isIsDir()) {
|
||||
if (fileStatus.isIsDir() || fileStatus.size == 0) {
|
||||
continue;
|
||||
}
|
||||
firstFile = fileStatus;
|
||||
break;
|
||||
}
|
||||
|
||||
// `firstFile == null` means:
|
||||
// 1. No matching file path exists
|
||||
// 2. All matched files have a size of 0
|
||||
// For these two situations, we don't need to get schema from BE
|
||||
if (firstFile == null) {
|
||||
throw new AnalysisException("Can not get first file, please check uri.");
|
||||
return null;
|
||||
}
|
||||
|
||||
// set TFileRangeDesc
|
||||
|
||||
@ -0,0 +1,6 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select1 --
|
||||
|
||||
-- !desc1 --
|
||||
__dummy_col TEXT Yes false \N NONE
|
||||
|
||||
17
regression-test/data/load_p0/tvf/test_tvf_empty_file.out
Normal file
17
regression-test/data/load_p0/tvf/test_tvf_empty_file.out
Normal file
@ -0,0 +1,17 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select --
|
||||
|
||||
-- !desc --
|
||||
__dummy_col TEXT Yes false \N NONE
|
||||
|
||||
-- !select2 --
|
||||
1 doris 18
|
||||
2 nereids 20
|
||||
3 xxx 22
|
||||
4 yyy 21
|
||||
|
||||
-- !des2 --
|
||||
c1 TEXT Yes false \N NONE
|
||||
c2 TEXT Yes false \N NONE
|
||||
c3 TEXT Yes false \N NONE
|
||||
|
||||
11
regression-test/data/load_p0/tvf/test_tvf_error_url.out
Normal file
11
regression-test/data/load_p0/tvf/test_tvf_error_url.out
Normal file
@ -0,0 +1,11 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select --
|
||||
|
||||
-- !desc --
|
||||
__dummy_col TEXT Yes false \N NONE
|
||||
|
||||
-- !select2 --
|
||||
|
||||
-- !desc2 --
|
||||
__dummy_col TEXT Yes false \N NONE
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_hdfs_tvf_error_uri","external,hive,tvf,external_docker") {
|
||||
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
|
||||
// It's okay to use random `hdfsUser`, but can not be empty.
|
||||
def hdfsUserName = "doris"
|
||||
def format = "csv"
|
||||
def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
|
||||
def uri = ""
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
// test csv format
|
||||
uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/no_exist_file.csv"
|
||||
format = "csv"
|
||||
order_qt_select1 """ select * from HDFS(
|
||||
"uri" = "${uri}",
|
||||
"hadoop.username" = "${hdfsUserName}",
|
||||
"format" = "${format}"); """
|
||||
|
||||
order_qt_desc1 """ desc function HDFS(
|
||||
"uri" = "${uri}",
|
||||
"hadoop.username" = "${hdfsUserName}",
|
||||
"format" = "${format}"); """
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,69 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_tvf_empty_file", "p0") {
|
||||
String ak = getS3AK()
|
||||
String sk = getS3SK()
|
||||
String s3_endpoint = getS3Endpoint()
|
||||
String region = getS3Region()
|
||||
String bucket = context.config.otherConfigs.get("s3BucketName");
|
||||
|
||||
String path = "regression/datalake"
|
||||
|
||||
// ${path}/empty_file_test.csv is an empty file
|
||||
// so it should return empty sets.
|
||||
order_qt_select """ SELECT * FROM S3 (
|
||||
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
|
||||
order_qt_desc """ desc function S3 (
|
||||
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
|
||||
// ${path}/empty_file_test*.csv matches 3 files:
|
||||
// empty_file_test.csv, empty_file_test_1.csv, empty_file_test_2.csv
|
||||
// empty_file_test.csv is an empty file, but
|
||||
// empty_file_test_1.csv and empty_file_test_2.csv have data
|
||||
// so it should return data of empty_file_test_1.csv and empty_file_test_2.cs
|
||||
order_qt_select2 """ SELECT * FROM S3 (
|
||||
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test*.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
) order by c1;
|
||||
"""
|
||||
|
||||
order_qt_des2 """ desc function S3 (
|
||||
"uri" = "http://${bucket}.${s3_endpoint}/${path}/empty_file_test*.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
}
|
||||
61
regression-test/suites/load_p0/tvf/test_tvf_error_url.groovy
Normal file
61
regression-test/suites/load_p0/tvf/test_tvf_error_url.groovy
Normal file
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_tvf_error_url", "p0") {
|
||||
String ak = getS3AK()
|
||||
String sk = getS3SK()
|
||||
String s3_endpoint = getS3Endpoint()
|
||||
String region = getS3Region()
|
||||
String bucket = context.config.otherConfigs.get("s3BucketName");
|
||||
|
||||
String path = "select_tvf/no_exists_file_test"
|
||||
order_qt_select """ SELECT * FROM S3 (
|
||||
"uri" = "http://${s3_endpoint}/${bucket}/${path}/no_exist_file1.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
|
||||
order_qt_desc """ desc function S3 (
|
||||
"uri" = "http://${s3_endpoint}/${bucket}/${path}/no_exist_file1.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
|
||||
order_qt_select2 """ SELECT * FROM S3 (
|
||||
"uri" = "http://${s3_endpoint}/${bucket}/${path}/*.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
|
||||
order_qt_desc2 """ desc function S3 (
|
||||
"uri" = "http://${s3_endpoint}/${bucket}/${path}/*.csv",
|
||||
"ACCESS_KEY"= "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"format" = "csv",
|
||||
"region" = "${region}"
|
||||
);
|
||||
"""
|
||||
}
|
||||
Reference in New Issue
Block a user