From 27549564a7000778ed02ae2f379cbb1126ea7bcd Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Sun, 6 Nov 2022 11:04:26 +0800 Subject: [PATCH] [feature](table-valued-function) Support S3 tvf (#13959) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This pr does three things: 1. Modified the framework of table-valued-function(tvf). 2. be support `fetch_table_schema` rpc. 3. Implemented `S3(path, AK, SK, format)` table-valued-function. --- be/src/exec/exec_node.cpp | 10 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/service/internal_service.cpp | 74 ++++++ be/src/service/internal_service.h | 5 + be/src/vec/CMakeLists.txt | 4 +- .../vdata_gen_function_inf.h} | 6 +- .../vnumbers_tvf.cpp} | 13 +- .../vnumbers_tvf.h} | 8 +- be/src/vec/exec/format/csv/csv_reader.cpp | 148 ++++++++++- be/src/vec/exec/format/csv/csv_reader.h | 19 ++ be/src/vec/exec/format/generic_reader.h | 5 + ...n_scannode.cpp => vdata_gen_scan_node.cpp} | 35 ++- ...ction_scannode.h => vdata_gen_scan_node.h} | 11 +- .../analysis/TableValuedFunctionRef.java | 14 +- .../doris/catalog/FunctionGenTable.java | 11 +- .../org/apache/doris/common/util/S3URI.java | 4 +- ...tionScanNode.java => DataGenScanNode.java} | 26 +- .../doris/planner/DistributedPlanner.java | 2 +- .../doris/planner/SingleNodePlanner.java | 3 +- .../external/ExternalFileScanNode.java | 63 +++-- ...viderIf.java => HMSTableScanProvider.java} | 12 +- .../planner/external/HiveScanProvider.java | 141 +---------- .../planner/external/QueryScanProvider.java | 183 ++++++++++++++ .../planner/external/TVFScanProvider.java | 142 +++++++++++ .../doris/rpc/BackendServiceClient.java | 5 + .../apache/doris/rpc/BackendServiceProxy.java | 12 + .../DataGenTableValuedFunction.java | 30 +++ .../ExternalFileTableValuedFunction.java | 233 ++++++++++++++++++ .../NumbersTableValuedFunction.java | 29 ++- .../tablefunction/S3TableValuedFunction.java | 111 +++++++++ ...ionInf.java => TableValuedFunctionIf.java} | 33 ++- gensrc/proto/internal_service.proto | 12 + gensrc/thrift/PlanNodes.thrift | 14 +- 33 files changed, 1156 insertions(+), 266 deletions(-) rename be/src/vec/exec/{tablefunction/vtable_valued_function_inf.h => data_gen_functions/vdata_gen_function_inf.h} (90%) rename be/src/vec/exec/{tablefunction/vnumbers_tbf.cpp => data_gen_functions/vnumbers_tvf.cpp} (87%) rename be/src/vec/exec/{tablefunction/vnumbers_tbf.h => data_gen_functions/vnumbers_tvf.h} (87%) rename be/src/vec/exec/{vtable_valued_function_scannode.cpp => vdata_gen_scan_node.cpp} (69%) rename be/src/vec/exec/{vtable_valued_function_scannode.h => vdata_gen_scan_node.h} (84%) rename fe/fe-core/src/main/java/org/apache/doris/planner/{TableValuedFunctionScanNode.java => DataGenScanNode.java} (78%) rename fe/fe-core/src/main/java/org/apache/doris/planner/external/{HMSTableScanProviderIf.java => HMSTableScanProvider.java} (75%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java rename fe/fe-core/src/main/java/org/apache/doris/tablefunction/{TableValuedFunctionInf.java => TableValuedFunctionIf.java} (52%) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ed1389bcf0..ce7bb4d317 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -69,6 +69,7 @@ #include "vec/exec/vanalytic_eval_node.h" #include "vec/exec/vassert_num_rows_node.h" #include "vec/exec/vbroker_scan_node.h" +#include "vec/exec/vdata_gen_scan_node.h" #include "vec/exec/vempty_set_node.h" #include "vec/exec/ves_http_scan_node.h" #include "vec/exec/vexcept_node.h" @@ -84,7 +85,6 @@ #include "vec/exec/vselect_node.h" #include "vec/exec/vsort_node.h" #include "vec/exec/vtable_function_node.h" -#include "vec/exec/vtable_valued_function_scannode.h" #include "vec/exec/vunion_node.h" #include "vec/exprs/vexpr.h" @@ -417,7 +417,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::REPEAT_NODE: case TPlanNodeType::TABLE_FUNCTION_NODE: case TPlanNodeType::BROKER_SCAN_NODE: - case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: + case TPlanNodeType::DATA_GEN_SCAN_NODE: case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::JDBC_SCAN_NODE: break; @@ -650,9 +650,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); - case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: + case TPlanNodeType::DATA_GEN_SCAN_NODE: if (state->enable_vectorized_exec()) { - *node = pool->add(new vectorized::VTableValuedFunctionScanNode(pool, tnode, descs)); + *node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs)); return Status::OK(); } else { error_msg << "numbers table function only support vectorized execution"; @@ -721,7 +721,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); - collect_nodes(TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index cad45d2468..04d32572af 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -748,8 +748,8 @@ bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || type == TPlanNodeType::BROKER_SCAN_NODE || type == TPlanNodeType::ES_SCAN_NODE || type == TPlanNodeType::ES_HTTP_SCAN_NODE || type == TPlanNodeType::ODBC_SCAN_NODE || - type == TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE || - type == TPlanNodeType::FILE_SCAN_NODE || type == TPlanNodeType::JDBC_SCAN_NODE; + type == TPlanNodeType::DATA_GEN_SCAN_NODE || type == TPlanNodeType::FILE_SCAN_NODE || + type == TPlanNodeType::JDBC_SCAN_NODE; } Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 9d0c4d0383..3e672ab910 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -44,11 +44,14 @@ #include "util/md5.h" #include "util/proto_util.h" #include "util/ref_count_closure.h" +#include "util/s3_uri.h" #include "util/string_util.h" #include "util/telemetry/brpc_carrier.h" #include "util/telemetry/telemetry.h" #include "util/thrift_util.h" #include "util/uid_util.h" +#include "vec/exec/format/csv/csv_reader.h" +#include "vec/exec/format/generic_reader.h" #include "vec/runtime/vdata_stream_mgr.h" namespace doris { @@ -409,6 +412,77 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } +void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, + const PFetchTableSchemaRequest* request, + PFetchTableSchemaResult* result, + google::protobuf::Closure* done) { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + } + + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + // file_slots is no use + std::vector file_slots; + std::unique_ptr reader(nullptr); + std::unique_ptr profile(new RuntimeProfile("FetchTableSchema")); + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::unordered_map name_to_col_type; + std::vector col_names; + std::vector col_types; + st = reader->get_parsered_schema(&col_names, &col_types); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } + st.to_protobuf(result->mutable_status()); +} + void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index f07d5c89f3..3ea3655974 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -68,6 +68,11 @@ public: void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void fetch_table_schema(google::protobuf::RpcController* controller, + const PFetchTableSchemaRequest* request, + PFetchTableSchemaResult* result, + google::protobuf::Closure* done) override; + void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 71587587a4..6d44d0cff9 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -116,8 +116,8 @@ set(VEC_FILES exec/vparquet_scanner.cpp exec/vorc_scanner.cpp exec/join/vhash_join_node.cpp - exec/tablefunction/vnumbers_tbf.cpp - exec/vtable_valued_function_scannode.cpp + exec/data_gen_functions/vnumbers_tvf.cpp + exec/vdata_gen_scan_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp exprs/vexpr.cpp diff --git a/be/src/vec/exec/tablefunction/vtable_valued_function_inf.h b/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h similarity index 90% rename from be/src/vec/exec/tablefunction/vtable_valued_function_inf.h rename to be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h index 89c9222200..86e453dbb5 100644 --- a/be/src/vec/exec/tablefunction/vtable_valued_function_inf.h +++ b/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h @@ -30,12 +30,12 @@ class Status; namespace vectorized { -class VTableValuedFunctionInf { +class VDataGenFunctionInf { public: - VTableValuedFunctionInf(TupleId tuple_id, const TupleDescriptor* tuple_desc) + VDataGenFunctionInf(TupleId tuple_id, const TupleDescriptor* tuple_desc) : _tuple_id(tuple_id), _tuple_desc(tuple_desc) {} - virtual ~VTableValuedFunctionInf() = default; + virtual ~VDataGenFunctionInf() = default; // Should set function parameters in this method virtual Status set_scan_ranges(const std::vector& scan_ranges) = 0; diff --git a/be/src/vec/exec/tablefunction/vnumbers_tbf.cpp b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp similarity index 87% rename from be/src/vec/exec/tablefunction/vnumbers_tbf.cpp rename to be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp index f05848ca61..33a179c3ae 100644 --- a/be/src/vec/exec/tablefunction/vnumbers_tbf.cpp +++ b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/tablefunction/vnumbers_tbf.h" +#include "vec/exec/data_gen_functions/vnumbers_tvf.h" #include @@ -29,10 +29,10 @@ namespace doris::vectorized { -VNumbersTBF::VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc) - : VTableValuedFunctionInf(tuple_id, tuple_desc) {} +VNumbersTVF::VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc) + : VDataGenFunctionInf(tuple_id, tuple_desc) {} -Status VNumbersTBF::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { +Status VNumbersTVF::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { bool mem_reuse = block->mem_reuse(); DCHECK(block->rows() == 0); std::vector columns(_slot_num); @@ -75,8 +75,9 @@ Status VNumbersTBF::get_next(RuntimeState* state, vectorized::Block* block, bool return Status::OK(); } -Status VNumbersTBF::set_scan_ranges(const std::vector& scan_range_params) { - _total_numbers = scan_range_params[0].scan_range.tvf_scan_range.numbers_params.totalNumbers; +Status VNumbersTVF::set_scan_ranges(const std::vector& scan_range_params) { + _total_numbers = + scan_range_params[0].scan_range.data_gen_scan_range.numbers_params.totalNumbers; return Status::OK(); } diff --git a/be/src/vec/exec/tablefunction/vnumbers_tbf.h b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h similarity index 87% rename from be/src/vec/exec/tablefunction/vnumbers_tbf.h rename to be/src/vec/exec/data_gen_functions/vnumbers_tvf.h index c2b5199c85..0c83aae98f 100644 --- a/be/src/vec/exec/tablefunction/vnumbers_tbf.h +++ b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h @@ -20,7 +20,7 @@ #include #include "runtime/descriptors.h" -#include "vec/exec/tablefunction/vtable_valued_function_inf.h" +#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h" namespace doris { @@ -33,10 +33,10 @@ class Status; namespace vectorized { -class VNumbersTBF : public VTableValuedFunctionInf { +class VNumbersTVF : public VDataGenFunctionInf { public: - VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc); - ~VNumbersTBF() = default; + VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc); + ~VNumbersTVF() = default; Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index b7661e4109..2a115d8f07 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -59,6 +59,23 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); } +CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + const std::vector& file_slot_descs) + : _state(nullptr), + _profile(profile), + _params(params), + _range(range), + _file_slot_descs(file_slot_descs), + _line_reader(nullptr), + _line_reader_eof(false), + _text_converter(nullptr), + _decompressor(nullptr) { + _file_format_type = _params.format_type; + _file_compress_type = _params.compress_type; + _size = _range.size; +} + CsvReader::~CsvReader() {} Status CsvReader::init_reader(bool is_load) { @@ -185,6 +202,32 @@ Status CsvReader::get_columns(std::unordered_map* n return Status::OK(); } +Status CsvReader::get_parsered_schema(std::vector* col_names, + std::vector* col_types) { + size_t read_line = 0; + bool is_parse_name = false; + RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name)); + + if (read_line == 1) { + if (!is_parse_name) { //parse csv file without names and types + size_t col_nums = 0; + RETURN_IF_ERROR(_parse_col_nums(&col_nums)); + for (size_t i = 0; i < col_nums; ++i) { + col_names->emplace_back("c" + std::to_string(i + 1)); + } + } else { // parse csv file with names + RETURN_IF_ERROR(_parse_col_names(col_names)); + } + for (size_t j = 0; j < col_names->size(); ++j) { + col_types->emplace_back(TypeDescriptor::create_string_type()); + } + } else { // parse csv file without names and types + RETURN_IF_ERROR(_parse_col_names(col_names)); + RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types)); + } + return Status::OK(); +} + Status CsvReader::_create_decompressor() { CompressType compress_type; if (_file_compress_type != TFileCompressType::UNKNOWN) { @@ -362,7 +405,8 @@ void CsvReader::_split_line(const Slice& line) { // Match a separator non_space = curpos; // Trim tailing spaces. Be consistent with hive and trino's behavior. - if (_state->trim_tailing_spaces_for_external_table_query()) { + if (_state != nullptr && + _state->trim_tailing_spaces_for_external_table_query()) { while (non_space > start && *(value + non_space - 1) == ' ') { non_space--; } @@ -378,7 +422,7 @@ void CsvReader::_split_line(const Slice& line) { CHECK(curpos == line.size) << curpos << " vs " << line.size; non_space = curpos; - if (_state->trim_tailing_spaces_for_external_table_query()) { + if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) { while (non_space > start && *(value + non_space - 1) == ' ') { non_space--; } @@ -422,4 +466,104 @@ bool CsvReader::_is_array(const Slice& slice) { return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 1] == ']'; } +Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { + int64_t start_offset = _range.start_offset; + if (start_offset != 0) { + return Status::InvalidArgument( + "start offset of TFileRangeDesc must be zero in get parsered schema"); + } + if (_params.file_type == TFileType::FILE_STREAM || + _params.file_type == TFileType::FILE_BROKER) { + return Status::InternalError( + "Getting parsered schema from csv file do not support stream load and broker " + "load."); + } + + // csv file without names line and types line. + *read_line = 1; + *is_parse_name = false; + + if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && + _params.file_attributes.header_type.size() > 0) { + std::string header_type = to_lower(_params.file_attributes.header_type); + if (header_type == BeConsts::CSV_WITH_NAMES) { + *is_parse_name = true; + } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { + *read_line = 2; + *is_parse_name = true; + } + } + + // create and open file reader + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, _range.path, start_offset, + _range.file_size, 0, _file_reader)); + RETURN_IF_ERROR(_file_reader->open()); + if (_file_reader->size() == 0) { + return Status::EndOfFile("Empty File"); + } + + // get column_separator and line_delimiter + _value_separator = _params.file_attributes.text_params.column_separator; + _value_separator_length = _value_separator.size(); + _line_delimiter = _params.file_attributes.text_params.line_delimiter; + _line_delimiter_length = _line_delimiter.size(); + + // create decompressor. + // _decompressor may be nullptr if this is not a compressed file + RETURN_IF_ERROR(_create_decompressor()); + + _line_reader.reset(new PlainTextLineReader(_profile, _file_reader.get(), _decompressor.get(), + _size, _line_delimiter, _line_delimiter_length)); + return Status::OK(); +} + +Status CsvReader::_parse_col_nums(size_t* col_nums) { + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof)); + if (size == 0) { + return Status::InternalError("The first line is empty, can not parse column numbers"); + } + if (!validate_utf8(const_cast(reinterpret_cast(ptr)), size)) { + return Status::InternalError("Only support csv data in utf8 codec"); + } + _split_line(Slice(ptr, size)); + *col_nums = _split_values.size(); + return Status::OK(); +} + +Status CsvReader::_parse_col_names(std::vector* col_names) { + const uint8_t* ptr = nullptr; + size_t size = 0; + // no use of _line_reader_eof + RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof)); + if (size == 0) { + return Status::InternalError("The first line is empty, can not parse column names"); + } + if (!validate_utf8(const_cast(reinterpret_cast(ptr)), size)) { + return Status::InternalError("Only support csv data in utf8 codec"); + } + _split_line(Slice(ptr, size)); + for (size_t idx = 0; idx < _split_values.size(); ++idx) { + col_names->emplace_back(_split_values[idx].to_string()); + } + return Status::OK(); +} + +// TODO(ftw): parse type +Status CsvReader::_parse_col_types(size_t col_nums, std::vector* col_types) { + // delete after. + for (size_t i = 0; i < col_nums; ++i) { + col_types->emplace_back(TypeDescriptor::create_string_type()); + } + + // 1. check _line_reader_eof + // 2. read line + // 3. check utf8 + // 4. check size + // 5. check _split_values.size must equal to col_nums. + // 6. fill col_types + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index cbb1b2c882..e7d2f7f7c0 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -34,6 +34,9 @@ public: CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs); + + CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, const std::vector& file_slot_descs); ~CsvReader() override; Status init_reader(bool is_query); @@ -41,7 +44,16 @@ public: Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + // get schema of csv file from first one line or first two lines. + // if file format is FORMAT_CSV_DEFLATE and if + // 1. header_type is empty, get schema from first line. + // 2. header_type is CSV_WITH_NAMES, get schema from first line. + // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line. + Status get_parsered_schema(std::vector* col_names, + std::vector* col_types) override; + private: + // used for stream/broker load of csv file. Status _create_decompressor(); Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows); Status _line_split_to_values(const Slice& line, bool* success); @@ -50,6 +62,13 @@ private: bool _is_null(const Slice& slice); bool _is_array(const Slice& slice); + // used for parse table schema of csv file. + Status _prepare_parse(size_t* read_line, bool* is_parse_name); + Status _parse_col_nums(size_t* col_nums); + Status _parse_col_names(std::vector* col_names); + // TODO(ftw): parse type + Status _parse_col_types(size_t col_nums, std::vector* col_types); + private: RuntimeState* _state; RuntimeProfile* _profile; diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 8fc09a68a3..e098557b82 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -37,6 +37,11 @@ public: std::unordered_set* missing_cols) { return Status::NotSupported("get_columns is not implemented"); } + + virtual Status get_parsered_schema(std::vector* col_names, + std::vector* col_types) { + return Status::NotSupported("get_parser_schema is not implemented for this reader."); + } virtual ~GenericReader() = default; }; diff --git a/be/src/vec/exec/vtable_valued_function_scannode.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp similarity index 69% rename from be/src/vec/exec/vtable_valued_function_scannode.cpp rename to be/src/vec/exec/vdata_gen_scan_node.cpp index ba015c5877..26ae698795 100644 --- a/be/src/vec/exec/vtable_valued_function_scannode.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/vtable_valued_function_scannode.h" +#include "vec/exec/vdata_gen_scan_node.h" #include @@ -26,28 +26,28 @@ #include "runtime/string_value.h" #include "runtime/tuple_row.h" #include "util/runtime_profile.h" -#include "vec/exec/tablefunction/vnumbers_tbf.h" +#include "vec/exec/data_gen_functions/vnumbers_tvf.h" namespace doris::vectorized { -VTableValuedFunctionScanNode::VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) +VDataGenFunctionScanNode::VDataGenFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), _is_init(false), - _tuple_id(tnode.table_valued_func_scan_node.tuple_id), + _tuple_id(tnode.data_gen_scan_node.tuple_id), _tuple_desc(nullptr) { // set _table_func here - switch (tnode.table_valued_func_scan_node.func_name) { - case TTVFunctionName::NUMBERS: - _table_func = std::make_shared(_tuple_id, _tuple_desc); + switch (tnode.data_gen_scan_node.func_name) { + case TDataGenFunctionName::NUMBERS: + _table_func = std::make_shared(_tuple_id, _tuple_desc); break; default: LOG(FATAL) << "Unsupported function type"; } } -Status VTableValuedFunctionScanNode::prepare(RuntimeState* state) { - VLOG_CRITICAL << "VTableValuedFunctionScanNode::Prepare"; +Status VDataGenFunctionScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VDataGenFunctionScanNode::Prepare"; if (_is_init) { return Status::OK(); @@ -70,7 +70,7 @@ Status VTableValuedFunctionScanNode::prepare(RuntimeState* state) { return Status::OK(); } -Status VTableValuedFunctionScanNode::open(RuntimeState* state) { +Status VDataGenFunctionScanNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); if (nullptr == state) { @@ -86,13 +86,13 @@ Status VTableValuedFunctionScanNode::open(RuntimeState* state) { return Status::OK(); } -Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - LOG(FATAL) << "VTableValuedFunctionScanNode only support vectorized execution"; +Status VDataGenFunctionScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { + LOG(FATAL) << "VDataGenFunctionScanNode only support vectorized execution"; return Status::OK(); } -Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block, - bool* eos) { +Status VDataGenFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block, + bool* eos) { if (state == nullptr || block == nullptr || eos == nullptr) { return Status::InternalError("input is NULL pointer"); } @@ -103,7 +103,7 @@ Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, vectorized::B return res; } -Status VTableValuedFunctionScanNode::close(RuntimeState* state) { +Status VDataGenFunctionScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } @@ -113,8 +113,7 @@ Status VTableValuedFunctionScanNode::close(RuntimeState* state) { return ExecNode::close(state); } -Status VTableValuedFunctionScanNode::set_scan_ranges( - const std::vector& scan_ranges) { +Status VDataGenFunctionScanNode::set_scan_ranges(const std::vector& scan_ranges) { return _table_func->set_scan_ranges(scan_ranges); } diff --git a/be/src/vec/exec/vtable_valued_function_scannode.h b/be/src/vec/exec/vdata_gen_scan_node.h similarity index 84% rename from be/src/vec/exec/vtable_valued_function_scannode.h rename to be/src/vec/exec/vdata_gen_scan_node.h index 9dfca9b63d..73470e8d49 100644 --- a/be/src/vec/exec/vtable_valued_function_scannode.h +++ b/be/src/vec/exec/vdata_gen_scan_node.h @@ -21,7 +21,7 @@ #include "exec/scan_node.h" #include "runtime/descriptors.h" -#include "vec/exec/tablefunction/vtable_valued_function_inf.h" +#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h" namespace doris { @@ -34,11 +34,10 @@ class Status; namespace vectorized { -class VTableValuedFunctionScanNode : public ScanNode { +class VDataGenFunctionScanNode : public ScanNode { public: - VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs); - ~VTableValuedFunctionScanNode() override = default; + VDataGenFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~VDataGenFunctionScanNode() override = default; // initialize _mysql_scanner, and create _text_converter. Status prepare(RuntimeState* state) override; @@ -59,7 +58,7 @@ public: Status set_scan_ranges(const std::vector& scan_ranges) override; protected: - std::shared_ptr _table_func; + std::shared_ptr _table_func; bool _is_init; // Tuple id resolved in prepare() to set _tuple_desc; TupleId _tuple_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java index ffb4feb308..a697e68fef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -19,18 +19,20 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Table; import org.apache.doris.common.UserException; -import org.apache.doris.tablefunction.TableValuedFunctionInf; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.tablefunction.TableValuedFunctionIf; import java.util.List; public class TableValuedFunctionRef extends TableRef { private Table table; - private TableValuedFunctionInf tableFunction; + private TableValuedFunctionIf tableFunction; public TableValuedFunctionRef(String funcName, String alias, List params) throws UserException { super(new TableName(null, null, "_table_valued_function_" + funcName), alias); - this.tableFunction = TableValuedFunctionInf.getTableFunction(funcName, params); + this.tableFunction = TableValuedFunctionIf.getTableFunction(funcName, params); if (hasExplicitAlias()) { return; } @@ -70,7 +72,11 @@ public class TableValuedFunctionRef extends TableRef { analyzeJoin(analyzer); } - public TableValuedFunctionInf getTableFunction() { + public ScanNode getScanNode(PlanNodeId id) { + return tableFunction.getScanNode(id, desc); + } + + public TableValuedFunctionIf getTableFunction() { return tableFunction; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionGenTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionGenTable.java index 19660fde89..c4181fa991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionGenTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionGenTable.java @@ -17,12 +17,21 @@ package org.apache.doris.catalog; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + import java.util.List; public class FunctionGenTable extends Table { + private TableValuedFunctionIf tvf; - public FunctionGenTable(long id, String tableName, TableType type, List fullSchema) { + public FunctionGenTable(long id, String tableName, TableType type, List fullSchema, + TableValuedFunctionIf tvf) { super(id, tableName, type, fullSchema); + this.tvf = tvf; + } + + public TableValuedFunctionIf getTvf() { + return tvf; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java index aac04354cb..e119c92323 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java @@ -36,8 +36,8 @@ import java.util.Set; */ public class S3URI { - private static final String SCHEME_DELIM = "://"; - private static final String PATH_DELIM = "/"; + public static final String SCHEME_DELIM = "://"; + public static final String PATH_DELIM = "/"; private static final String QUERY_DELIM = "\\?"; private static final String FRAGMENT_DELIM = "#"; private static final Set VALID_SCHEMES = ImmutableSet.of("http", "https", "s3", "s3a", "s3n", "bos"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java similarity index 78% rename from fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index af2c26dfca..2565516150 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,14 +22,14 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.tablefunction.TableValuedFunctionInf; +import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; +import org.apache.doris.thrift.TDataGenScanNode; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.doris.thrift.TTableValuedFunctionScanNode; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -38,17 +38,17 @@ import org.apache.logging.log4j.Logger; import java.util.List; /** - * This scan node is used for table valued function. + * This scan node is used for data source generated from memory. */ -public class TableValuedFunctionScanNode extends ScanNode { - private static final Logger LOG = LogManager.getLogger(TableValuedFunctionScanNode.class.getName()); +public class DataGenScanNode extends ScanNode { + private static final Logger LOG = LogManager.getLogger(DataGenScanNode.class.getName()); private List shardScanRanges; - private TableValuedFunctionInf tvf; + private DataGenTableValuedFunction tvf; private boolean isFinalized = false; - public TableValuedFunctionScanNode(PlanNodeId id, TupleDescriptor desc, - String planNodeName, TableValuedFunctionInf tvf) { + public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, + String planNodeName, DataGenTableValuedFunction tvf) { super(id, desc, planNodeName, StatisticalType.TABLE_VALUED_FUNCTION_NODE); this.tvf = tvf; } @@ -85,11 +85,11 @@ public class TableValuedFunctionScanNode extends ScanNode { @Override protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.TABLE_VALUED_FUNCTION_SCAN_NODE; - TTableValuedFunctionScanNode tvfScanNode = new TTableValuedFunctionScanNode(); - tvfScanNode.setTupleId(desc.getId().asInt()); - tvfScanNode.setFuncName(tvf.getFuncName()); - msg.table_valued_func_scan_node = tvfScanNode; + msg.node_type = TPlanNodeType.DATA_GEN_SCAN_NODE; + TDataGenScanNode dataGenScanNode = new TDataGenScanNode(); + dataGenScanNode.setTupleId(desc.getId().asInt()); + dataGenScanNode.setFuncName(tvf.getDataGenFunctionName()); + msg.data_gen_scan_node = dataGenScanNode; } private List getShardLocations() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index d005a6ff3c..19bfa88719 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -275,7 +275,7 @@ public class DistributedPlanner { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED); } else if (node instanceof SchemaScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); - } else if (node instanceof TableValuedFunctionScanNode) { + } else if (node instanceof DataGenScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); } else if (node instanceof OlapScanNode) { // olap scan node diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 488b8901c7..73a6b1ae25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1935,8 +1935,7 @@ public class SingleNodePlanner { scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (JdbcTable) tblRef.getTable()); break; case TABLE_VALUED_FUNCTION: - scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(), - "TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction()); + scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); break; case HMS_EXTERNAL_TABLE: scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index dba614e960..45a7b21747 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; @@ -44,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -162,30 +164,13 @@ public class ExternalFileScanNode extends ExternalScanNode { switch (type) { case QUERY: - HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); - Preconditions.checkNotNull(hmsTable); - - if (hmsTable.isView()) { - throw new AnalysisException( - String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), - hmsTable.getDbName(), hmsTable.getName())); + if (this.desc.getTable() instanceof HMSExternalTable) { + HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); + initHMSExternalTable(hmsTable); + } else if (this.desc.getTable() instanceof FunctionGenTable) { + FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); + initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); } - - FileScanProviderIf scanProvider; - switch (hmsTable.getDlaType()) { - case HUDI: - scanProvider = new HudiScanProvider(hmsTable, desc); - break; - case ICEBERG: - scanProvider = new IcebergScanProvider(hmsTable, desc); - break; - case HIVE: - scanProvider = new HiveScanProvider(hmsTable, desc); - break; - default: - throw new UserException("Unknown table type: " + hmsTable.getDlaType()); - } - this.scanProviders.add(scanProvider); break; case LOAD: for (FileGroupInfo fileGroupInfo : fileGroupInfos) { @@ -202,6 +187,38 @@ public class ExternalFileScanNode extends ExternalScanNode { initParamCreateContexts(analyzer); } + private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException { + Preconditions.checkNotNull(hmsTable); + + if (hmsTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), + hmsTable.getDbName(), hmsTable.getName())); + } + + FileScanProviderIf scanProvider; + switch (hmsTable.getDlaType()) { + case HUDI: + scanProvider = new HudiScanProvider(hmsTable, desc); + break; + case ICEBERG: + scanProvider = new IcebergScanProvider(hmsTable, desc); + break; + case HIVE: + scanProvider = new HiveScanProvider(hmsTable, desc); + break; + default: + throw new UserException("Unknown table type: " + hmsTable.getDlaType()); + } + this.scanProviders.add(scanProvider); + } + + private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { + Preconditions.checkNotNull(table); + FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, tvf); + this.scanProviders.add(scanProvider); + } + // For each scan provider, create a corresponding ParamCreateContext private void initParamCreateContexts(Analyzer analyzer) throws UserException { for (FileScanProviderIf scanProvider : scanProviders) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java similarity index 75% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java index b602722a31..283fc90f30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java @@ -24,13 +24,11 @@ import org.apache.hadoop.hive.metastore.api.Table; import java.util.Map; -/** - * An interface for hms table scan node to get the need information. - */ -public interface HMSTableScanProviderIf extends FileScanProviderIf { - String getMetaStoreUrl(); +public abstract class HMSTableScanProvider extends QueryScanProvider { - Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; + public abstract String getMetaStoreUrl(); - Map getTableProperties() throws MetaNotFoundException; + public abstract Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; + + public abstract Map getTableProperties() throws MetaNotFoundException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 3c1a5235e7..85425f4cb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -29,27 +29,14 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; -import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.THdfsParams; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; -import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -59,7 +46,6 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -75,7 +61,7 @@ import java.util.stream.Collectors; /** * A HiveScanProvider to get information for scan node. */ -public class HiveScanProvider implements HMSTableScanProviderIf { +public class HiveScanProvider extends HMSTableScanProvider { private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class); private static final String PROP_FIELD_DELIMITER = "field.delim"; @@ -84,8 +70,6 @@ public class HiveScanProvider implements HMSTableScanProviderIf { protected HMSExternalTable hmsTable; - protected int inputSplitNum = 0; - protected long inputFileSize = 0; protected final TupleDescriptor desc; public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { @@ -257,128 +241,19 @@ public class HiveScanProvider implements HMSTableScanProviderIf { } @Override - public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, - List scanRangeLocations) throws UserException { - try { - List inputSplits = getSplits(context.conjuncts); - this.inputSplitNum = inputSplits.size(); - if (inputSplits.isEmpty()) { - return; - } - - String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); - String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); - String fsName = fullPath.replace(filePath, ""); - TFileType locationType = getLocationType(); - context.params.setFileType(locationType); - TFileFormatType fileFormatType = getFileFormatType(); - context.params.setFormatType(getFileFormatType()); - if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) { - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - String columnSeparator - = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER); - textParams.setColumnSeparator(columnSeparator); - textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); - TFileAttributes fileAttributes = new TFileAttributes(); - fileAttributes.setTextParams(textParams); - context.params.setFileAttributes(fileAttributes); - } - - // set hdfs params for hdfs file type. - Map locationProperties = getLocationProperties(); - if (locationType == TFileType.FILE_HDFS) { - THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - context.params.setHdfsParams(tHdfsParams); - } else if (locationType == TFileType.FILE_S3) { - context.params.setProperties(locationProperties); - } - - TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); - - FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); - - for (InputSplit split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - List pathPartitionKeys = getPathPartitionKeys(); - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - pathPartitionKeys, false); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.info( - "Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " - + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" - + " loaction: " + Joiner.on("|").join(split.getLocations())); - - fileSplitStrategy.update(fileSplit); - // Add a new location when it's can be split - if (fileSplitStrategy.hasNext()) { - scanRangeLocations.add(curLocations); - curLocations = newLocations(context.params, backendPolicy); - fileSplitStrategy.next(); - } - this.inputFileSize += fileSplit.getLength(); - } - if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { - scanRangeLocations.add(curLocations); - } - } catch (IOException e) { - throw new UserException(e); - } + public String getColumnSeparator() throws UserException { + return hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER); } @Override - public int getInputSplitNum() { - return this.inputSplitNum; + public String getLineSeparator() { + return DEFAULT_LINE_DELIMITER; } @Override - public long getInputFileSize() { - return this.inputFileSize; - } - - private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend = backendPolicy.getNextBe(); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - locations.addToLocations(location); - - return locations; - } - - private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath, - List columnsFromPathKeys) - throws DdlException, MetaNotFoundException { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setStartOffset(fileSplit.getStart()); - rangeDesc.setSize(fileSplit.getLength()); - rangeDesc.setColumnsFromPath(columnsFromPath); - rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); - - if (getLocationType() == TFileType.FILE_HDFS) { - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (getLocationType() == TFileType.FILE_S3) { - rangeDesc.setPath(fileSplit.getPath().toString()); - } - return rangeDesc; + public String getHeaderType() { + return ""; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java new file mode 100644 index 0000000000..70154ce59c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Joiner; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public abstract class QueryScanProvider implements FileScanProviderIf { + public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class); + private int inputSplitNum = 0; + private long inputFileSize = 0; + + public abstract String getColumnSeparator() throws UserException; + + public abstract String getLineSeparator(); + + public abstract String getHeaderType(); + + @Override + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + try { + List inputSplits = getSplits(context.conjuncts); + this.inputSplitNum = inputSplits.size(); + if (inputSplits.isEmpty()) { + return; + } + + String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + TFileType locationType = getLocationType(); + context.params.setFileType(locationType); + TFileFormatType fileFormatType = getFileFormatType(); + context.params.setFormatType(getFileFormatType()); + if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) { + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setColumnSeparator(getColumnSeparator()); + textParams.setLineDelimiter(getLineSeparator()); + TFileAttributes fileAttributes = new TFileAttributes(); + fileAttributes.setTextParams(textParams); + fileAttributes.setHeaderType(getHeaderType()); + context.params.setFileAttributes(fileAttributes); + } + + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (locationType == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + context.params.setHdfsParams(tHdfsParams); + } else if (locationType == TFileType.FILE_S3) { + context.params.setProperties(locationProperties); + } + + TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); + + FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); + + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + List pathPartitionKeys = getPathPartitionKeys(); + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + pathPartitionKeys, false); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + LOG.info( + "Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" + + " loaction: " + Joiner.on("|").join(split.getLocations())); + + fileSplitStrategy.update(fileSplit); + // Add a new location when it's can be split + if (fileSplitStrategy.hasNext()) { + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, backendPolicy); + fileSplitStrategy.next(); + } + this.inputFileSize += fileSplit.getLength(); + } + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { + scanRangeLocations.add(curLocations); + } + } catch (IOException e) { + throw new UserException(e); + } + } + + @Override + public int getInputSplitNum() { + return this.inputSplitNum; + } + + @Override + public long getInputFileSize() { + return this.inputFileSize; + } + + private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend selectedBackend = backendPolicy.getNextBe(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath, + List columnsFromPathKeys) + throws DdlException, MetaNotFoundException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setColumnsFromPath(columnsFromPath); + rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); + + if (getLocationType() == TFileType.FILE_HDFS) { + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + } else if (getLocationType() == TFileType.FILE_S3) { + rangeDesc.setPath(fileSplit.getPath().toString()); + } + return rangeDesc; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java new file mode 100644 index 0000000000..88627cef71 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileType; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class TVFScanProvider extends QueryScanProvider { + private FunctionGenTable tvfTable; + private final TupleDescriptor desc; + private ExternalFileTableValuedFunction tableValuedFunction; + + public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc, + ExternalFileTableValuedFunction tableValuedFunction) { + this.tvfTable = tvfTable; + this.desc = desc; + this.tableValuedFunction = tableValuedFunction; + } + + // =========== implement abstract methods of QueryScanProvider ================= + @Override + public String getColumnSeparator() throws UserException { + return tableValuedFunction.getColumnSeparator(); + } + + @Override + public String getLineSeparator() { + return tableValuedFunction.getLineSeparator(); + } + + @Override + public String getHeaderType() { + return tableValuedFunction.getHeaderType(); + } + + + // =========== implement interface methods of FileScanProviderIf ================ + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + return tableValuedFunction.getTFileFormatType(); + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + return tableValuedFunction.getTFileType(); + } + + @Override + public List getSplits(List exprs) throws IOException, UserException { + List splits = Lists.newArrayList(); + List fileStatuses = tableValuedFunction.getFileStatuses(); + for (TBrokerFileStatus fileStatus : fileStatuses) { + Path path = new Path(fileStatus.getPath()); + FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(), new String[0]); + splits.add(fileSplit); + } + return splits; + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + return tableValuedFunction.getLocationProperties(); + } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return Lists.newArrayList(); + } + + @Override + public ParamCreateContext createContext(Analyzer analyzer) throws UserException { + ParamCreateContext context = new ParamCreateContext(); + context.params = new TFileScanRangeParams(); + context.destTupleDescriptor = desc; + context.params.setDestTupleId(desc.getId().asInt()); + // no use, only for avoid null exception. + context.fileGroup = new BrokerFileGroup(tvfTable.getId(), "", ""); + + + // Hive table must extract partition value from path and hudi/iceberg table keep + // partition field in file. + List partitionKeys = getPathPartitionKeys(); + List columns = tvfTable.getBaseSchema(false); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slot.getId().asInt()); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + context.params.addToRequiredSlots(slotInfo); + } + return context; + } + + @Override + public TableIf getTargetTable() { + return tvfTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index d0d3e4c19f..fc7729d882 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -93,6 +93,11 @@ public class BackendServiceClient { return blockingStub.fetchData(request); } + public Future fetchTableStructureAsync( + InternalService.PFetchTableSchemaRequest request) { + return stub.fetchTableSchema(request); + } + public Future updateCache(InternalService.PUpdateCacheRequest request) { return stub.updateCache(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 466a7b1525..a7c2df6e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -189,6 +189,18 @@ public class BackendServiceProxy { } } + public Future fetchTableStructureAsync( + TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchTableStructureAsync(request); + } catch (Throwable e) { + LOG.warn("fetch table structure catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future updateCache( TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java new file mode 100644 index 0000000000..ab3579e9d8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TDataGenFunctionName; + +import java.util.List; + +public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf { + public abstract List getTasks() throws AnalysisException; + + public abstract TDataGenFunctionName getDataGenFunctionName(); + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java new file mode 100644 index 0000000000..d6b6af0ce0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest; +import org.apache.doris.proto.Types.PScalarType; +import org.apache.doris.proto.Types.PTypeDesc; +import org.apache.doris.proto.Types.PTypeNode; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPrimitiveType; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL table-valued-function + */ +public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf { + public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class); + public static final String DEFAULT_COLUMN_SEPARATOR = ","; + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + protected List columns = null; + protected List fileStatuses = Lists.newArrayList(); + protected Map locationProperties; + + protected TFileFormatType fileFormatType; + protected String headerType = ""; + + protected String columnSeparator = DEFAULT_COLUMN_SEPARATOR; + protected String lineDelimiter = DEFAULT_LINE_DELIMITER; + + public abstract TFileType getTFileType(); + + public abstract String getFilePath(); + + public abstract BrokerDesc getBrokerDesc(); + + public TFileFormatType getTFileFormatType() { + return fileFormatType; + } + + public Map getLocationProperties() { + return locationProperties; + } + + public String getColumnSeparator() { + return columnSeparator; + } + + public String getLineSeparator() { + return lineDelimiter; + } + + public String getHeaderType() { + return headerType; + } + + public void parseFile() throws UserException { + String path = getFilePath(); + BrokerDesc brokerDesc = getBrokerDesc(); + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); + } + + public List getFileStatuses() { + return fileStatuses; + } + + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + return new ExternalFileScanNode(id, desc); + } + + @Override + public List getTableColumns() throws AnalysisException { + if (this.columns != null) { + return columns; + } + // get one BE address + TNetworkAddress address = null; + columns = Lists.newArrayList(); + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + break; + } + } + if (address == null) { + throw new AnalysisException("No Alive backends"); + } + + try { + PFetchTableSchemaRequest request = getFetchTableStructureRequest(); + Future future = BackendServiceProxy.getInstance() + .fetchTableStructureAsync(address, request); + + InternalService.PFetchTableSchemaResult result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "fetchTableStructureAsync failed. backend address: " + + address.getHostname() + ":" + address.getPort(); + } + throw new AnalysisException(errMsg); + } + + fillColumns(result); + } catch (RpcException e) { + throw new AnalysisException("fetchTableStructureResult rpc exception", e); + } catch (InterruptedException e) { + throw new AnalysisException("fetchTableStructureResult interrupted exception", e); + } catch (ExecutionException e) { + throw new AnalysisException("fetchTableStructureResult exception", e); + } catch (TException e) { + throw new AnalysisException("getFetchTableStructureRequest exception", e); + } + return columns; + } + + private void fillColumns(InternalService.PFetchTableSchemaResult result) + throws AnalysisException { + if (result.getColumnNums() == 0) { + throw new AnalysisException("The amount of column is 0"); + } + for (int idx = 0; idx < result.getColumnNums(); ++idx) { + PTypeDesc type = result.getColumnTypes(idx); + String colName = result.getColumnNames(idx); + for (PTypeNode typeNode : type.getTypesList()) { + // only support ScalarType. + PScalarType scalarType = typeNode.getScalarType(); + TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(scalarType.getType()); + columns.add(new Column(colName, PrimitiveType.fromThrift(tPrimitiveType), true)); + } + } + } + + private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException { + // set TFileScanRangeParams + TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams(); + fileScanRangeParams.setFileType(getTFileType()); + fileScanRangeParams.setFormatType(fileFormatType); + fileScanRangeParams.setProperties(locationProperties); + fileScanRangeParams.setFileAttributes(getFileAttributes()); + + // get first file, used to parse table schema + TBrokerFileStatus firstFile = null; + for (TBrokerFileStatus fileStatus : fileStatuses) { + if (fileStatus.isIsDir()) { + continue; + } + firstFile = fileStatus; + break; + } + if (firstFile == null) { + throw new AnalysisException("Can not get first file, please check s3 uri."); + } + + // set TFileRangeDesc + TFileRangeDesc fileRangeDesc = new TFileRangeDesc(); + fileRangeDesc.setPath(firstFile.getPath()); + fileRangeDesc.setStartOffset(0); + fileRangeDesc.setSize(firstFile.getSize()); + // set TFileScanRange + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.addToRanges(fileRangeDesc); + fileScanRange.setParams(fileScanRangeParams); + return InternalService.PFetchTableSchemaRequest.newBuilder() + .setFileScanRange(ByteString.copyFrom(new TSerializer().serialize(fileScanRange))).build(); + } + + private TFileAttributes getFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) { + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileTextScanRangeParams.setColumnSeparator(this.columnSeparator); + fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); + fileAttributes.setTextParams(fileTextScanRangeParams); + fileAttributes.setHeaderType(this.headerType); + } + return fileAttributes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java index 16c3ae35d2..3be38e7a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java @@ -17,21 +17,22 @@ package org.apache.doris.tablefunction; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.DataGenScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TDataGenFunctionName; +import org.apache.doris.thrift.TDataGenScanRange; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TTVFNumbersScanRange; -import org.apache.doris.thrift.TTVFScanRange; -import org.apache.doris.thrift.TTVFunctionName; import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; @@ -43,9 +44,8 @@ import java.util.List; /** * The Implement of table valued function——numbers(N,M). */ -public class NumbersTableValuedFunction extends TableValuedFunctionInf { +public class NumbersTableValuedFunction extends DataGenTableValuedFunction { public static final String NAME = "numbers"; - private static final Logger LOG = LogManager.getLogger(PlanNode.class); // The total numbers will be generated. private long totalNumbers; // The total backends will server it. @@ -71,8 +71,8 @@ public class NumbersTableValuedFunction extends TableValuedFunctionInf { } @Override - public TTVFunctionName getFuncName() { - return TTVFunctionName.NUMBERS; + public TDataGenFunctionName getDataGenFunctionName() { + return TDataGenFunctionName.NUMBERS; } @Override @@ -102,13 +102,18 @@ public class NumbersTableValuedFunction extends TableValuedFunctionInf { List res = Lists.newArrayList(); for (int i = 0; i < tabletsNum; ++i) { TScanRange scanRange = new TScanRange(); - TTVFScanRange tvfScanRange = new TTVFScanRange(); + TDataGenScanRange dataGenScanRange = new TDataGenScanRange(); TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange(); tvfNumbersScanRange.setTotalNumbers(totalNumbers); - tvfScanRange.setNumbersParams(tvfNumbersScanRange); - scanRange.setTvfScanRange(tvfScanRange); + dataGenScanRange.setNumbersParams(tvfNumbersScanRange); + scanRange.setDataGenScanRange(dataGenScanRange); res.add(new TableValuedFunctionTask(backendList.get(i % backendList.size()), scanRange)); } return res; } + + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + return new DataGenScanNode(id, desc, "DataGenScanNode", this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java new file mode 100644 index 0000000000..19655fbedd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3URI; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import com.google.common.collect.Maps; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; + +/** + * The Implement of table valued function——S3(path, AK, SK, format). + */ +public class S3TableValuedFunction extends ExternalFileTableValuedFunction { + public static final Logger LOG = LogManager.getLogger(S3TableValuedFunction.class); + public static final String NAME = "s3"; + public static final String S3_AK = "AWS_ACCESS_KEY"; + public static final String S3_SK = "AWS_SECRET_KEY"; + public static final String S3_ENDPOINT = "AWS_ENDPOINT"; + public static final String S3_REGION = "AWS_REGION"; + public static final String USE_PATH_STYLE = "use_path_style"; + + private S3URI s3uri; + private String s3AK; + private String s3SK; + + public S3TableValuedFunction(List params) throws UserException { + if (params.size() != 4) { + throw new UserException( + "s3 table function only support 4 params now: S3(path, AK, SK, format)"); + } + + s3uri = S3URI.create(params.get(0)); + s3AK = params.get(1); + s3SK = params.get(2); + + String formatString = params.get(3).toLowerCase(); + switch (formatString) { + case "csv": + this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + break; + case "csv_with_names": + this.headerType = FeConstants.csv_with_names; + this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + break; + case "csv_with_names_and_types": + this.headerType = FeConstants.csv_with_names_and_types; + this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + break; + default: + throw new AnalysisException("format:" + formatString + " is not supported."); + } + + // set S3 location properties + locationProperties = Maps.newHashMap(); + locationProperties.put(S3_ENDPOINT, s3uri.getBucketScheme()); + locationProperties.put(S3_AK, s3AK); + locationProperties.put(S3_SK, s3SK); + locationProperties.put(S3_REGION, ""); + locationProperties.put(USE_PATH_STYLE, "true"); + + parseFile(); + } + + // =========== implement abstract methods of ExternalFileTableValuedFunction ================= + @Override + public TFileType getTFileType() { + return TFileType.FILE_S3; + } + + @Override + public String getFilePath() { + // must be "s3://..." + return NAME + S3URI.SCHEME_DELIM + s3uri.getKey(); + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("S3TvfBroker", StorageType.S3, locationProperties); + } + + // =========== implement abstract methods of TableValuedFunctionIf ================= + @Override + public String getTableName() { + return "S3TableValuedFunction"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java similarity index 52% rename from fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java rename to fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 76c98d2e38..09ef70ccb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -17,36 +17,43 @@ package org.apache.doris.tablefunction; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TTVFunctionName; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; import java.util.List; -public abstract class TableValuedFunctionInf { +public abstract class TableValuedFunctionIf { + private FunctionGenTable table = null; - public abstract TTVFunctionName getFuncName(); - - public FunctionGenTable getTable() { - FunctionGenTable table = new FunctionGenTable(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION, - getTableColumns()); + public FunctionGenTable getTable() throws AnalysisException { + if (table == null) { + table = new FunctionGenTable(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION, + getTableColumns(), this); + } return table; } // All table functions should be registered here - public static TableValuedFunctionInf getTableFunction(String funcName, List params) throws UserException { - if (funcName.equalsIgnoreCase(NumbersTableValuedFunction.NAME)) { - return new NumbersTableValuedFunction(params); + public static TableValuedFunctionIf getTableFunction(String funcName, List params) throws UserException { + switch (funcName.toLowerCase()) { + case NumbersTableValuedFunction.NAME: + return new NumbersTableValuedFunction(params); + case S3TableValuedFunction.NAME: + return new S3TableValuedFunction(params); + default: + throw new UserException("Could not find table function " + funcName); } - throw new UserException("Could not find table function " + funcName); } public abstract String getTableName(); - public abstract List getTableColumns(); + public abstract List getTableColumns() throws AnalysisException; - public abstract List getTasks() throws AnalysisException; + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 2379a28646..7d427db582 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -525,6 +525,17 @@ message PTabletWriteSlaveDoneResult { optional PStatus status = 1; }; +message PFetchTableSchemaRequest { + optional bytes file_scan_range = 1; +}; + +message PFetchTableSchemaResult { + optional PStatus status = 1; + optional int32 column_nums = 2; + repeated string column_names = 3; + repeated PTypeDesc column_types = 4; +}; + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -558,5 +569,6 @@ service PBackendService { rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse); rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns (PTabletWriteSlaveResult); rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) returns (PTabletWriteSlaveDoneResult); + rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult); }; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6dd774ea4a..2df3fea6a2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -53,7 +53,7 @@ enum TPlanNodeType { EXCEPT_NODE, ODBC_SCAN_NODE, TABLE_FUNCTION_NODE, - TABLE_VALUED_FUNCTION_SCAN_NODE, + DATA_GEN_SCAN_NODE, FILE_SCAN_NODE, JDBC_SCAN_NODE, } @@ -324,7 +324,7 @@ struct TExternalScanRange { // TODO: add more scan range type? } -enum TTVFunctionName { +enum TDataGenFunctionName { NUMBERS = 0, } @@ -334,7 +334,7 @@ struct TTVFNumbersScanRange { 1: optional i64 totalNumbers } -struct TTVFScanRange { +struct TDataGenScanRange { 1: optional TTVFNumbersScanRange numbers_params } @@ -347,7 +347,7 @@ struct TScanRange { 6: optional TBrokerScanRange broker_scan_range 7: optional TEsScanRange es_scan_range 8: optional TExternalScanRange ext_scan_range - 9: optional TTVFScanRange tvf_scan_range + 9: optional TDataGenScanRange data_gen_scan_range } struct TMySQLScanNode { @@ -903,9 +903,9 @@ struct TRuntimeFilterDesc { 9: optional i64 bloom_filter_size_bytes } -struct TTableValuedFunctionScanNode { +struct TDataGenScanNode { 1: optional Types.TTupleId tuple_id - 2: optional TTVFunctionName func_name + 2: optional TDataGenFunctionName func_name } // This is essentially a union of all messages corresponding to subclasses @@ -959,7 +959,7 @@ struct TPlanNode { // output column 42: optional list output_slot_ids - 43: optional TTableValuedFunctionScanNode table_valued_func_scan_node + 43: optional TDataGenScanNode data_gen_scan_node // file scan node 44: optional TFileScanNode file_scan_node