[Enhancement](tvf) Table value function support reading local file (#17404)

I tested the local tvf with tpch queries. First, generate `lineitem` datasets with 6001215 rows, and load it into `lineitem` table by:
```
insert into lineitem select c11, c1, c4, c2, c3, c5, c6, c7, c8, c9, c10, c12, c13, c14, c15, c16 
from local(
        "file_path" = "tools/tpch-tools/bin/tpch-data/lineitem.tbl.1", 
        "backend_id" = "10003", 
        "format" = "csv", 
        "column_separator" = "|"
);
```
Then, run `q1` and `q16` tpch queries, the query result is correct.

It can also analyze the BE's log directly like:

```
mysql> select * from local(
        "file_path" = "log/be.out",
        "backend_id" = "10006",
        "format" = "csv")
       where c1 like "%start_time%" limit 10;
+--------------------------------------------------------+
| c1                                                     |
+--------------------------------------------------------+
| start time: 2023年 08月 07日 星期一 23:20:32 CST       |
| start time: 2023年 08月 07日 星期一 23:32:10 CST       |
| start time: 2023年 08月 08日 星期二 00:20:50 CST       |
| start time: 2023年 08月 08日 星期二 00:29:15 CST       |
+--------------------------------------------------------+
```
This commit is contained in:
Chuanle Chen
2023-08-10 20:07:42 +08:00
committed by GitHub
parent 879024a3a2
commit 71807ceb5f
25 changed files with 736 additions and 14 deletions

View File

@ -1054,6 +1054,9 @@ DEFINE_Bool(enable_hdfs_hedged_read, "false");
DEFINE_Int32(hdfs_hedged_read_thread_num, "128");
DEFINE_Int32(hdfs_hedged_read_threshold_time, "500");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");

View File

@ -1111,6 +1111,9 @@ DECLARE_Int32(hdfs_hedged_read_thread_num);
// Maybe overwritten by the value specified when creating catalog
DECLARE_Int32(hdfs_hedged_read_threshold_time);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);

View File

@ -53,5 +53,25 @@ std::string hdfs_error() {
return ss.str();
}
std::string glob_err_to_str(int code) {
std::string msg;
// https://sites.uclouvain.be/SystInfo/usr/include/glob.h.html
switch (code) {
case 1:
msg = "Ran out of memory";
break;
case 2:
msg = "read error";
break;
case 3:
msg = "No matches found";
break;
default:
msg = "unknown";
break;
}
return fmt::format("({}), {}", code, msg);
}
} // namespace io
} // namespace doris

View File

@ -26,6 +26,7 @@ namespace io {
std::string errno_to_str();
std::string errcode_to_str(const std::error_code& ec);
std::string hdfs_error();
std::string glob_err_to_str(int code);
} // namespace io
} // namespace doris

View File

@ -19,6 +19,7 @@
#include <fcntl.h>
#include <fmt/format.h>
#include <glob.h>
#include <glog/logging.h>
#include <openssl/md5.h>
#include <sys/mman.h>
@ -428,5 +429,54 @@ const std::shared_ptr<LocalFileSystem>& global_local_filesystem() {
return local_fs;
}
Status LocalFileSystem::canonicalize_local_file(const std::string& dir,
const std::string& file_path,
std::string* full_path) {
const std::string absolute_path = dir + "/" + file_path;
std::string canonical_path;
RETURN_IF_ERROR(canonicalize(absolute_path, &canonical_path));
if (!contain_path(dir, canonical_path)) {
return Status::InvalidArgument("file path is not allowed: {}", canonical_path);
}
*full_path = canonical_path;
return Status::OK();
}
Status LocalFileSystem::safe_glob(const std::string& path, std::vector<FileInfo>* res) {
if (path.find("..") != std::string::npos) {
return Status::InvalidArgument("can not contain '..' in path");
}
std::string full_path = config::user_files_secure_path + "/" + path;
std::vector<std::string> files;
RETURN_IF_ERROR(_glob(full_path, &files));
for (auto& file : files) {
FileInfo fi;
fi.is_file = true;
RETURN_IF_ERROR(canonicalize_local_file("", file, &(fi.file_name)));
RETURN_IF_ERROR(file_size_impl(fi.file_name, &(fi.file_size)));
res->push_back(std::move(fi));
}
return Status::OK();
}
Status LocalFileSystem::_glob(const std::string& pattern, std::vector<std::string>* res) {
glob_t glob_result;
memset(&glob_result, 0, sizeof(glob_result));
int rc = glob(pattern.c_str(), GLOB_TILDE, NULL, &glob_result);
if (rc != 0) {
globfree(&glob_result);
return Status::IOError("failed to glob {}: {}", pattern, glob_err_to_str(rc));
}
for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
res->push_back(std::string(glob_result.gl_pathv[i]));
}
globfree(&glob_result);
return Status::OK();
}
} // namespace io
} // namespace doris

View File

@ -72,6 +72,15 @@ public:
// read local file and save content to "content"
Status read_file_to_string(const Path& file, std::string* content);
Status canonicalize_local_file(const std::string& dir, const std::string& file_path,
std::string* full_path);
// glob list the files match the path pattern.
// the result will be saved in "res", in absolute path with file size.
// "safe" means the path will be concat with the path prefix config::user_files_secure_path,
// so that it can not list any files outside the config::user_files_secure_path
Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_impl(const FileDescription& file_desc, const Path& abs_path,
@ -97,6 +106,8 @@ protected:
Status delete_directory_or_file_impl(const Path& path);
private:
// a wrapper for glob(), return file list in "res"
Status _glob(const std::string& pattern, std::vector<std::string>* res);
LocalFileSystem(Path&& root_path, std::string&& id = "");
};

View File

@ -74,7 +74,6 @@ void TabletSchemaCache::_recycle() {
}
}
_is_stopped = true;
LOG(INFO) << "xxx yyy stopped ";
}
} // namespace doris

View File

@ -59,6 +59,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "gutil/integral_types.h"
#include "http/http_client.h"
#include "io/fs/local_file_system.h"
#include "io/fs/stream_load_pipe.h"
#include "io/io_common.h"
#include "olap/data_dir.h"
@ -1636,4 +1637,25 @@ void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr
ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request, response);
}
void PInternalServiceImpl::glob(google::protobuf::RpcController* controller,
const PGlobRequest* request, PGlobResponse* response,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([request, response, done]() {
brpc::ClosureGuard closure_guard(done);
std::vector<io::FileInfo> files;
Status st = io::global_local_filesystem()->safe_glob(request->pattern(), &files);
if (st.ok()) {
for (auto& file : files) {
PGlobResponse_PFileInfo* pfile = response->add_files();
pfile->set_file(file.file_name);
pfile->set_size(file.file_size);
}
}
st.to_protobuf(response->mutable_status());
});
if (!ret) {
offer_failed(response, done, _heavy_work_pool);
}
}
} // namespace doris

View File

@ -186,6 +186,9 @@ public:
PReportStreamLoadStatusResponse* response,
google::protobuf::Closure* done) override;
void glob(google::protobuf::RpcController* controller, const PGlobRequest* request,
PGlobResponse* response, google::protobuf::Closure* done) override;
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,

View File

@ -610,4 +610,34 @@ TEST_F(LocalFileSystemTest, TestRandomWrite) {
EXPECT_TRUE(file_reader->close().ok());
}
}
TEST_F(LocalFileSystemTest, TestGlob) {
std::string path = "./be/ut_build_ASAN/test/file_path/";
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
EXPECT_TRUE(io::global_local_filesystem()
->create_directory("./be/ut_build_ASAN/test/file_path/1")
.ok());
EXPECT_TRUE(io::global_local_filesystem()
->create_directory("./be/ut_build_ASAN/test/file_path/2")
.ok());
EXPECT_TRUE(io::global_local_filesystem()
->create_directory("./be/ut_build_ASAN/test/file_path/3")
.ok());
save_string_file("./be/ut_build_ASAN/test/file_path/1/f1.txt", "just test");
save_string_file("./be/ut_build_ASAN/test/file_path/1/f2.txt", "just test");
save_string_file("./be/ut_build_ASAN/test/file_path/f3.txt", "just test");
std::vector<io::FileInfo> files;
EXPECT_FALSE(io::global_local_filesystem()->safe_glob("./../*.txt", &files).ok());
EXPECT_FALSE(io::global_local_filesystem()->safe_glob("/*.txt", &files).ok());
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/1/*.txt", &files).ok());
EXPECT_EQ(2, files.size());
files.clear();
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/*/*.txt", &files).ok());
EXPECT_EQ(2, files.size());
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
}
} // namespace doris

View File

@ -1476,3 +1476,8 @@ Indicates how many tablets failed to load in the data directory. At the same tim
* Description: If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled. If false, cancel query when the memory used exceeds exec_mem_limit.
* Default value: true
#### `user_files_secure_path`
* Description: The storage directory for files queried by `local` table valued functions.
* Default value: `${DORIS_HOME}`

View File

@ -0,0 +1,150 @@
---
{
"title": "local",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## Local
### Name
<version since="dev">
local
</version>
### Description
Local table-valued-function(tvf), allows users to read and access local file contents on be node, just like accessing relational table. Currently supports `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` file format.
It needs `ADMIN` privilege to use.
#### syntax
```sql
local(
"file_path" = "path/to/file.txt",
"backend_id" = "be_id",
"format" = "csv",
"keyn" = "valuen"
...
);
```
**parameter description**
Related parameters for accessing local file on be node:
- `file_path`:
(required) The path of the file to be read, which is a relative path to the `user_files_secure_path` directory, where `user_files_secure_path` parameter [can be configured on be](../../../admin-manual/config/be-config.md).
Can not contains `..` in path. Support using glob syntax to match multi files, such as `log/*.log`
- `backend_id`:
(required) The backend id where the file resides. The `backend_id` can be obtained by `show backends` command.
File format parameters:
- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`: (optional) default `,`.
- `line_delimiter`: (optional) default `\n`.
- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`.
The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md)
- `read_json_by_line`: (optional) default `"true"`
- `strip_outer_array`: (optional) default `"false"`
- `json_root`: (optional) default `""`
- `json_paths`: (optional) default `""`
- `num_as_string`: (optional) default `false`
- `fuzzy_parse`: (optional) default `false`
<version since="dev">The following 2 parameters are used for loading in csv format</version>
- `trim_double_quotes`: Boolean type (optional), the default value is `false`. True means that the outermost double quotes of each field in the csv file are trimmed.
- `skip_lines`: Integer type (optional), the default value is 0. It will skip some lines in the head of csv file. It will be disabled when the format is `csv_with_names` or `csv_with_names_and_types`.
### Examples
Analyze the log file on specified BE:
```sql
mysql> select * from local(
"file_path" = "log/be.out",
"backend_id" = "10006",
"format" = "csv")
where c1 like "%start_time%" limit 10;
+--------------------------------------------------------+
| c1 |
+--------------------------------------------------------+
| start time: 2023 08 07 星期一 23:20:32 CST |
| start time: 2023 08 07 星期一 23:32:10 CST |
| start time: 2023 08 08 星期二 00:20:50 CST |
| start time: 2023 08 08 星期二 00:29:15 CST |
+--------------------------------------------------------+
```
Read and access csv format files located at path `${DORIS_HOME}/student.csv`:
```sql
mysql> select * from local(
"file_path" = "student.csv",
"backend_id" = "10003",
"format" = "csv");
+------+---------+--------+
| c1 | c2 | c3 |
+------+---------+--------+
| 1 | alice | 18 |
| 2 | bob | 20 |
| 3 | jack | 24 |
| 4 | jackson | 19 |
| 5 | liming | d18 |
+------+---------+--------+
```
Can be used with `desc function` :
```sql
mysql> desc function local(
"file_path" = "student.csv",
"backend_id" = "10003",
"format" = "csv");
+-------+------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-------+---------+-------+
| c1 | TEXT | Yes | false | NULL | NONE |
| c2 | TEXT | Yes | false | NULL | NONE |
| c3 | TEXT | Yes | false | NULL | NONE |
+-------+------+------+-------+---------+-------+
```
### Keywords
local, table-valued-function, tvf
### Best Practice
For more detailed usage of local tvf, please refer to [S3](./s3.md) tvf, The only difference between them is the way of accessing the storage system.

View File

@ -722,6 +722,7 @@
"sql-manual/sql-functions/table-functions/explode-numbers-outer",
"sql-manual/sql-functions/table-functions/s3",
"sql-manual/sql-functions/table-functions/hdfs",
"sql-manual/sql-functions/table-functions/local",
"sql-manual/sql-functions/table-functions/iceberg-meta",
"sql-manual/sql-functions/table-functions/backends",
"sql-manual/sql-functions/table-functions/frontends",
@ -1296,4 +1297,4 @@
]
}
]
}
}

View File

@ -1505,3 +1505,8 @@ load tablets from header failed, failed tablets size: xxx, path=xxx
* 描述: 如果为true,则当内存未超过 exec_mem_limit 时,查询内存将不受限制;当进程内存超过 exec_mem_limit 且大于 2GB 时,查询会被取消。如果为false,则在使用的内存超过 exec_mem_limit 时取消查询。
* 默认值: true
#### `user_files_secure_path`
* 描述: `local` 表函数查询的文件的存储目录。
* 默认值: `${DORIS_HOME}`

View File

@ -0,0 +1,147 @@
---
{
"title": "local",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## local
### Name
<version since="dev">
local
</version>
### Description
Local表函数(table-valued-function,tvf),可以让用户像访问关系表格式数据一样,读取并访问 be 上的文件内容。目前支持`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`文件格式。
该函数需要 ADMIN 权限。
#### syntax
```sql
local(
"file_path" = "path/to/file.txt",
"backend_id" = "be_id",
"format" = "csv",
"keyn" = "valuen"
...
);
```
**参数说明**
访问local文件的相关参数:
- `file_path`
(必填)待读取文件的路径,该路径是一个相对于 `user_files_secure_path` 目录的相对路径, 其中 `user_files_secure_path` 参数是 [be的一个配置项](../../../admin-manual/config/be-config.md) 。
路径中不能包含 `..`,可以使用 glob 语法进行模糊匹配,如:`logs/*.log`
- `backend_id`:
(必填)文件所在的 be id。 `backend_id` 可以通过 `show backends` 命令得到。
文件格式相关参数
- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
- `column_separator`:(选填) 列分割符, 默认为`,`
- `line_delimiter`:(选填) 行分割符,默认为`\n`
- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。
下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md)
- `read_json_by_line`: (选填) 默认为 `"true"`
- `strip_outer_array`: (选填) 默认为 `"false"`
- `json_root`: (选填) 默认为空
- `json_paths`: (选填) 默认为空
- `num_as_string`: (选填) 默认为 `false`
- `fuzzy_parse`: (选填) 默认为 `false`
<version since="dev">下面2个参数是用于csv格式的导入</version>
- `trim_double_quotes`: 布尔类型,选填,默认值为 `false`,为 `true` 时表示裁剪掉 csv 文件每个字段最外层的双引号
- `skip_lines`: 整数类型,选填,默认值为0,含义为跳过csv文件的前几行。当设置format设置为 `csv_with_names``csv_with_names_and_types` 时,该参数会失效
### Examples
分析指定 BE 上的日志文件:
```sql
mysql> select * from local(
"file_path" = "log/be.out",
"backend_id" = "10006",
"format" = "csv")
where c1 like "%start_time%" limit 10;
+--------------------------------------------------------+
| c1 |
+--------------------------------------------------------+
| start time: 2023 08 07 星期一 23:20:32 CST |
| start time: 2023 08 07 星期一 23:32:10 CST |
| start time: 2023 08 08 星期二 00:20:50 CST |
| start time: 2023 08 08 星期二 00:29:15 CST |
+--------------------------------------------------------+
```
读取和访问位于路径`${DORIS_HOME}/student.csv`的 csv格式文件:
```sql
mysql> select * from local(
"file_path" = "student.csv",
"backend_id" = "10003",
"format" = "csv");
+------+---------+--------+
| c1 | c2 | c3 |
+------+---------+--------+
| 1 | alice | 18 |
| 2 | bob | 20 |
| 3 | jack | 24 |
| 4 | jackson | 19 |
| 5 | liming | d18 |
+------+---------+--------+
```
可以配合`desc function`使用
```sql
mysql> desc function local(
"file_path" = "student.csv",
"backend_id" = "10003",
"format" = "csv");
+-------+------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-------+---------+-------+
| c1 | TEXT | Yes | false | NULL | NONE |
| c2 | TEXT | Yes | false | NULL | NONE |
| c3 | TEXT | Yes | false | NULL | NONE |
+-------+------+------+-------+---------+-------+
```
### Keywords
local, table-valued-function, tvf
### Best Practice
关于local tvf的更详细使用方法可以参照 [S3](./s3.md) tvf, 唯一不同的是访问存储系统的方式不一样。

View File

@ -27,6 +27,7 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
@ -103,11 +104,12 @@ public class TableValuedFunctionRef extends TableRef {
return;
}
// check privilige for backends tvf
if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)) {
// check privilige for backends/local tvf
if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)
|| funcName.equalsIgnoreCase(LocalTableValuedFunction.NAME)) {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)
&& !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.OPERATOR)) {
PrivPredicate.OPERATOR)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR");
}
}

View File

@ -364,7 +364,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
}
} else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) {
} else if ((locationType == TFileType.FILE_S3 || locationType == TFileType.FILE_LOCAL)
&& !params.isSetProperties()) {
params.setProperties(locationProperties);
}
@ -405,6 +406,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (locationType == TFileType.FILE_S3
|| locationType == TFileType.FILE_BROKER
|| locationType == TFileType.FILE_LOCAL
|| locationType == TFileType.FILE_NET) {
// need full path
rangeDesc.setPath(fileSplit.getPath().toString());

View File

@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
@ -27,7 +28,9 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
@ -40,6 +43,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -62,6 +66,21 @@ public class TVFScanNode extends FileQueryScanNode {
}
@Override
protected void initBackendPolicy() throws UserException {
List<String> preferLocations = new ArrayList<>();
if (tableValuedFunction instanceof LocalTableValuedFunction) {
// For local tvf, the backend was specified by backendId
Long backendId = ((LocalTableValuedFunction) tableValuedFunction).getBackendId();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
throw new UserException("Backend " + backendId + " does not exist");
}
preferLocations.add(backend.getHost());
}
backendPolicy.init(preferLocations);
numNodes = backendPolicy.numBackends();
}
protected String getFsName(FileSplit split) {
return tableValuedFunction.getFsName();
}

View File

@ -147,6 +147,10 @@ public class BackendServiceClient {
return stub.reportStreamLoadStatus(request);
}
public Future<InternalService.PGlobResponse> glob(InternalService.PGlobRequest request) {
return stub.glob(request);
}
public void shutdown() {
if (!channel.isShutdown()) {
channel.shutdown();

View File

@ -399,4 +399,16 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PGlobResponse> glob(TNetworkAddress address,
InternalService.PGlobRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.glob(request);
} catch (Throwable e) {
LOG.warn("failed to glob dir from BE {}:{}, path: {}, error: ",
address.getHostname(), address.getPort(), request.getPattern());
throw new RpcException(address.hostname, e.getMessage());
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
@ -365,18 +366,13 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
return columns;
}
// get one BE address
TNetworkAddress address = null;
columns = Lists.newArrayList();
for (Backend be : org.apache.doris.catalog.Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
break;
}
}
if (address == null) {
Backend be = getBackend();
if (be == null) {
throw new AnalysisException("No Alive backends");
}
TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
@ -408,6 +404,15 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
return columns;
}
protected Backend getBackend() {
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
return be;
}
}
return null;
}
/**
* Convert PTypeDesc into doris column type
*

View File

@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGlobResponse;
import org.apache.doris.proto.InternalService.PGlobResponse.PFileInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* The implement of table valued function
* local("file_path" = "path/to/file.txt", "backend_id" = "be_id").
*/
public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG = LogManager.getLogger(LocalTableValuedFunction.class);
public static final String NAME = "local";
public static final String FILE_PATH = "file_path";
public static final String BACKEND_ID = "backend_id";
private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
.add(FILE_PATH)
.add(BACKEND_ID)
.build();
private String filePath;
private long backendId;
public LocalTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> fileFormatParams = new CaseInsensitiveMap();
locationProperties = Maps.newHashMap();
for (String key : params.keySet()) {
if (FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
fileFormatParams.put(key, params.get(key));
} else if (LOCATION_PROPERTIES.contains(key.toLowerCase())) {
locationProperties.put(key.toLowerCase(), params.get(key));
} else {
throw new AnalysisException(key + " is invalid property");
}
}
if (!locationProperties.containsKey(FILE_PATH)) {
throw new AnalysisException(String.format("Configuration '%s' is required.", FILE_PATH));
}
if (!locationProperties.containsKey(BACKEND_ID)) {
throw new AnalysisException(String.format("Configuration '%s' is required.", BACKEND_ID));
}
filePath = locationProperties.get(FILE_PATH);
backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
parseProperties(fileFormatParams);
getFileListFromBackend();
}
private void getFileListFromBackend() throws AnalysisException {
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
if (be == null) {
throw new AnalysisException("backend not found with backend_id = " + backendId);
}
BackendServiceProxy proxy = BackendServiceProxy.getInstance();
TNetworkAddress address = be.getBrpcAdress();
InternalService.PGlobRequest.Builder requestBuilder = InternalService.PGlobRequest.newBuilder();
requestBuilder.setPattern(filePath);
try {
Future<PGlobResponse> response = proxy.glob(address, requestBuilder.build());
PGlobResponse globResponse = response.get(5, TimeUnit.SECONDS);
if (globResponse.getStatus().getStatusCode() != 0) {
throw new AnalysisException(
"error code: " + globResponse.getStatus().getStatusCode()
+ ", error msg: " + globResponse.getStatus().getErrorMsgsList());
}
for (PFileInfo file : globResponse.getFilesList()) {
fileStatuses.add(new TBrokerFileStatus(file.getFile().trim(), false, file.getSize(), true));
LOG.info("get file from backend success. file: {}, size: {}", file.getFile(), file.getSize());
}
} catch (Exception e) {
throw new AnalysisException("get file list from backend failed. " + e.getMessage());
}
}
@Override
public TFileType getTFileType() {
return TFileType.FILE_LOCAL;
}
@Override
public String getFilePath() {
return filePath;
}
@Override
public BrokerDesc getBrokerDesc() {
return new BrokerDesc("LocalTvfBroker", StorageType.LOCAL, locationProperties);
}
@Override
public String getTableName() {
return "LocalTableValuedFunction";
}
public Long getBackendId() {
return backendId;
}
@Override
protected Backend getBackend() {
return Env.getCurrentSystemInfo().getBackend(backendId);
}
}

View File

@ -51,6 +51,8 @@ public abstract class TableValuedFunctionIf {
return new HdfsTableValuedFunction(params);
case StreamTableValuedFunction.NAME:
return new StreamTableValuedFunction(params);
case LocalTableValuedFunction.NAME:
return new LocalTableValuedFunction(params);
case IcebergTableValuedFunction.NAME:
return new IcebergTableValuedFunction(params);
case BackendsTableValuedFunction.NAME:

View File

@ -679,6 +679,19 @@ message PReportStreamLoadStatusResponse {
optional PStatus status = 1;
}
message PGlobRequest {
optional string pattern = 1;
}
message PGlobResponse {
message PFileInfo {
optional string file = 1;
optional int64 size = 2;
};
required PStatus status = 1;
repeated PFileInfo files = 2;
}
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@ -717,5 +730,6 @@ service PBackendService {
rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns (PFetchColIdsResponse);
rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse);
rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse);
rpc glob(PGlobRequest) returns (PGlobResponse);
};

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This suit test the `backends` tvf
suite("test_local_tvf") {
List<List<Object>> table = sql """ select * from backends(); """
assertTrue(table.size() > 0)
def be_id = table[0][0]
table = sql """
select count(*) from local(
"file_path" = "log/be.out",
"backend_id" = "${be_id}",
"format" = "csv")
where c1 like "%start_time%";"""
assertTrue(table.size() > 0)
assertTrue(Long.valueOf(table[0][0]) > 0)
table = sql """
select count(*) from local(
"file_path" = "log/*.out",
"backend_id" = "${be_id}",
"format" = "csv")
where c1 like "%start_time%";"""
assertTrue(table.size() > 0)
assertTrue(Long.valueOf(table[0][0]) > 0)
test {
sql """
select count(*) from local(
"file_path" = "../log/be.out",
"backend_id" = "${be_id}",
"format" = "csv")
where c1 like "%start_time%";
"""
// check exception message contains
exception "can not contain '..' in path"
}
test {
sql """
select count(*) from local(
"file_path" = "./log/xx.out",
"backend_id" = "${be_id}",
"format" = "csv")
where c1 like "%start_time%";
"""
// check exception message contains
exception "No matches found"
}
}