From abd10f0f3e212dc5d02a1bc049530ec29d605fb8 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 29 Jun 2022 11:04:01 +0800 Subject: [PATCH] [feature-wip](multi-catalog) Impl FileScanNode in be (#10402) Define a new file scanner node for hms table in be. This file scanner node is different from broker scan node as blow: 1. Broker scan node will define src slot and dest slot, there is two memory copy in it: first is from file to src slot and second from src to dest slot. Otherwise FileScanNode only have one stemp memory copy just from file to dest slot. 2. Broker scan node will read all the filed in the file to src slot and FileScanNode only read the need filed. 3. Broker scan node will convert type into string type for src slot and then use cast to convert to dest slot type, but FileScanNode will have the final type. Now FileScanNode is a standalone code, but we will uniform the file scan and broker scan in the feature. --- .gitignore | 1 + be/src/exec/exec_node.cpp | 8 + be/src/exec/text_converter.h | 3 + be/src/exec/text_converter.hpp | 10 +- be/src/vec/CMakeLists.txt | 6 +- be/src/vec/exec/file_arrow_scanner.cpp | 225 ++++++++++++ be/src/vec/exec/file_arrow_scanner.h | 100 ++++++ be/src/vec/exec/file_scan_node.cpp | 334 ++++++++++++++++++ be/src/vec/exec/file_scan_node.h | 118 +++++++ be/src/vec/exec/file_scanner.cpp | 209 +++++++++++ be/src/vec/exec/file_scanner.h | 94 +++++ be/src/vec/exec/file_text_scanner.cpp | 301 ++++++++++++++++ be/src/vec/exec/file_text_scanner.h | 69 ++++ .../doris/analysis/DescriptorTable.java | 16 +- .../apache/doris/analysis/StmtRewriter.java | 4 +- .../org/apache/doris/analysis/TableRef.java | 6 +- .../doris/analysis/TupleDescriptor.java | 3 +- .../catalog/HiveMetaStoreClientHelper.java | 4 + .../org/apache/doris/catalog/TableIf.java | 9 +- .../doris/catalog/external/ExternalTable.java | 6 + .../catalog/external/HMSExternalTable.java | 40 ++- .../apache/doris/common/util/BrokerUtil.java | 18 +- .../doris/datasource/DataSourceMgr.java | 4 + .../datasource/HMSExternalDataSource.java | 45 +-- .../apache/doris/planner/BrokerScanNode.java | 3 +- .../apache/doris/planner/HudiScanNode.java | 7 +- .../doris/planner/SingleNodePlanner.java | 4 + .../external/ExternalFileScanNode.java | 204 ++++------- .../external/ExternalFileScanProvider.java | 4 +- .../external/ExternalHiveScanProvider.java | 10 +- .../external/ExternalHudiScanProvider.java | 9 + .../external/ExternalIcebergScanProvider.java | 22 +- .../doris/statistics/StatisticalType.java | 1 + gensrc/thrift/PlanNodes.thrift | 50 ++- 34 files changed, 1746 insertions(+), 201 deletions(-) create mode 100644 be/src/vec/exec/file_arrow_scanner.cpp create mode 100644 be/src/vec/exec/file_arrow_scanner.h create mode 100644 be/src/vec/exec/file_scan_node.cpp create mode 100644 be/src/vec/exec/file_scan_node.h create mode 100644 be/src/vec/exec/file_scanner.cpp create mode 100644 be/src/vec/exec/file_scanner.h create mode 100644 be/src/vec/exec/file_text_scanner.cpp create mode 100644 be/src/vec/exec/file_text_scanner.h diff --git a/.gitignore b/.gitignore index 80e2ae605a..03a2c7339f 100644 --- a/.gitignore +++ b/.gitignore @@ -71,6 +71,7 @@ fe/fe-core/src/main/resources/static/ # BE be/build*/ +be/cmake-build*/ be/ut_build*/ be/src/gen_cpp/*.[cc, cpp, h] be/src/gen_cpp/opcode diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index b2de6bc180..54d2fdbea6 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -60,6 +60,7 @@ #include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/core/block.h" +#include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" @@ -393,6 +394,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::TABLE_FUNCTION_NODE: case TPlanNodeType::BROKER_SCAN_NODE: case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: + case TPlanNodeType::FILE_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -555,6 +557,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); + case TPlanNodeType::FILE_SCAN_NODE: + *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + + return Status::OK(); + case TPlanNodeType::REPEAT_NODE: if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::VRepeatNode(pool, tnode, descs)); @@ -664,6 +671,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); } void ExecNode::try_do_aggregate_serde_improve() { diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index b729d2012d..18f1e4185f 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -53,6 +53,9 @@ public: bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, bool copy_string, bool need_escape); + bool write_vec_column(const SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, + const char* data, size_t len, bool copy_string, bool need_escape); + // Removes escape characters from len characters of the null-terminated string src, // and copies the unescaped string into dest, changing *len to the unescaped length. // No null-terminator is added to dest. diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 0d6297f501..b931d4c560 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -200,6 +200,12 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, col_ptr = &nullable_column->get_nested_column(); } } + return write_vec_column(slot_desc, col_ptr, data, len, copy_string, need_escape); +} + +inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, + vectorized::IColumn* col_ptr, const char* data, + size_t len, bool copy_string, bool need_escape) { StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; // Parse the raw-text data. Translate the text string to internal format. @@ -209,6 +215,7 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, HyperLogLog(Slice(data, len))); break; } + case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { if (need_escape) { @@ -305,8 +312,7 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, if (parse_result == StringParser::PARSE_FAILURE) { if (true == slot_desc->is_nullable()) { - auto* nullable_column = - reinterpret_cast(column_ptr->get()); + auto* nullable_column = reinterpret_cast(col_ptr); size_t size = nullable_column->get_null_map_data().size(); doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data(); null_map_data[size - 1] = 1; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ad03c00b81..b32142ee80 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -206,7 +206,11 @@ set(VEC_FILES runtime/vfile_result_writer.cpp runtime/vpartition_info.cpp utils/arrow_column_to_doris_column.cpp - runtime/vsorted_run_merger.cpp) + runtime/vsorted_run_merger.cpp + exec/file_arrow_scanner.cpp + exec/file_scanner.cpp + exec/file_scan_node.cpp + exec/file_text_scanner.cpp) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp new file mode 100644 index 0000000000..57defdb8ef --- /dev/null +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_arrow_scanner.h" + +#include "exec/arrow/parquet_reader.h" +#include "io/buffered_reader.h" +#include "io/hdfs_reader_writer.h" +#include "runtime/descriptors.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris::vectorized { + +FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), + _cur_file_reader(nullptr), + _cur_file_eof(false), + _batch(nullptr), + _arrow_batch_cur_idx(0) {} + +FileArrowScanner::~FileArrowScanner() { + close(); +} + +Status FileArrowScanner::_open_next_reader() { + // open_file_reader + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } + + while (true) { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TFileRangeDesc& range = _ranges[_next_range++]; + std::unique_ptr file_reader; + + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + range.start_offset, &hdfs_reader)); + file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } + + int32_t num_of_columns_from_file = _file_slot_descs.size(); + + _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file); + + Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone()); + + if (status.is_end_of_file()) { + continue; + } else { + if (!status.ok()) { + std::stringstream ss; + ss << " file: " << range.path << " error:" << status.get_error_msg(); + return Status::InternalError(ss.str()); + } else { + return status; + } + } + } +} + +Status FileArrowScanner::open() { + RETURN_IF_ERROR(FileScanner::open()); + if (_ranges.empty()) { + return Status::OK(); + } + return Status::OK(); +} + +// get next available arrow batch +Status FileArrowScanner::_next_arrow_batch() { + _arrow_batch_cur_idx = 0; + // first, init file reader + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + } + // second, loop until find available arrow batch or EOF + while (!_scanner_eof) { + RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); + if (_cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + continue; + } + if (_batch->num_rows() == 0) { + continue; + } + return Status::OK(); + } + return Status::EndOfFile("EOF"); +} + +Status FileArrowScanner::_init_arrow_batch_if_necessary() { + // 1. init batch if first time + // 2. reset reader if end of file + Status status; + if (_scanner_eof) { + return Status::EndOfFile("EOF"); + } + if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + return _next_arrow_batch(); + } + return status; +} + +Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) { + SCOPED_TIMER(_read_timer); + // init arrow batch + { + Status st = _init_arrow_batch_if_necessary(); + if (!st.ok()) { + if (!st.is_end_of_file()) { + return st; + } + *eof = true; + return Status::OK(); + } + } + + RETURN_IF_ERROR(init_block(block)); + // convert arrow batch to block until reach the batch_size + while (!_scanner_eof) { + // cast arrow type to PT0 and append it to block + // for example: arrow::Type::INT16 => TYPE_SMALLINT + RETURN_IF_ERROR(_append_batch_to_block(block)); + // finalize the block if full + if (_rows >= _state->batch_size()) { + break; + } + auto status = _next_arrow_batch(); + // if ok, append the batch to the columns + if (status.ok()) { + continue; + } + // return error if not EOF + if (!status.is_end_of_file()) { + return status; + } + _cur_file_eof = true; + break; + } + + return finalize_block(block, eof); +} + +Status FileArrowScanner::_append_batch_to_block(Block* block) { + size_t num_elements = std::min((_state->batch_size() - block->rows()), + (_batch->num_rows() - _arrow_batch_cur_idx)); + for (auto i = 0; i < _file_slot_descs.size(); ++i) { + SlotDescriptor* slot_desc = _file_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->GetColumnByName(slot_desc->col_name()).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column( + array, _arrow_batch_cur_idx, column_with_type_and_name.column, + column_with_type_and_name.type, num_elements, _state->timezone())); + } + _rows += num_elements; + _arrow_batch_cur_idx += num_elements; + return Status::OK(); +} + +void FileArrowScanner::close() { + FileScanner::close(); + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } +} + +VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} + +ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); +} + +VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} + +ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h new file mode 100644 index 0000000000..3f0ee32082 --- /dev/null +++ b/be/src/vec/exec/file_arrow_scanner.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "util/runtime_profile.h" +#include "vec/exec/file_scanner.h" + +namespace doris::vectorized { + +// VArrow scanner convert the data read from orc|parquet to doris's columns. +class FileArrowScanner : public FileScanner { +public: + FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~FileArrowScanner() override; + + // Open this scanner, will initialize information need to + Status open() override; + + Status get_next(Block* block, bool* eof) override; + + void close() override; + +protected: + virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) = 0; + +private: + // Read next buffer from reader + Status _open_next_reader(); + Status _next_arrow_batch(); + Status _init_arrow_batch_if_necessary(); + Status _append_batch_to_block(Block* block); + +private: + // Reader + ArrowReaderWrap* _cur_file_reader; + bool _cur_file_eof; // is read over? + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx; +}; + +class VFileParquetScanner final : public FileArrowScanner { +public: + VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VFileParquetScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; +}; + +class VFileORCScanner final : public FileArrowScanner { +public: + VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VFileORCScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp new file mode 100644 index 0000000000..b01fc739e2 --- /dev/null +++ b/be/src/vec/exec/file_scan_node.cpp @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_scan_node.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/mem_tracker.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.h" +#include "runtime/tuple.h" +#include "runtime/tuple_row.h" +#include "util/runtime_profile.h" +#include "util/thread.h" +#include "util/types.h" +#include "vec/exec/file_arrow_scanner.h" +#include "vec/exec/file_text_scanner.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { + +FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _tuple_id(tnode.file_scan_node.tuple_id), + _runtime_state(nullptr), + _tuple_desc(nullptr), + _num_running_scanners(0), + _scan_finished(false), + _max_buffered_batches(32), + _wait_scanner_timer(nullptr) {} + +Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::init(tnode, state)); + auto& file_scan_node = tnode.file_scan_node; + + if (file_scan_node.__isset.pre_filter_exprs) { + _pre_filter_texprs = file_scan_node.pre_filter_exprs; + } + + return Status::OK(); +} + +Status FileScanNode::prepare(RuntimeState* state) { + VLOG_QUERY << "FileScanNode prepare"; + RETURN_IF_ERROR(ScanNode::prepare(state)); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + // get tuple desc + _runtime_state = state; + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id; + return Status::InternalError(ss.str()); + } + + // Initialize slots map + for (auto slot : _tuple_desc->slots()) { + auto pair = _slots_map.emplace(slot->col_name(), slot); + if (!pair.second) { + std::stringstream ss; + ss << "Failed to insert slot, col_name=" << slot->col_name(); + return Status::InternalError(ss.str()); + } + } + + // Profile + _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); + + return Status::OK(); +} + +Status FileScanNode::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + + RETURN_IF_ERROR(start_scanners()); + + return Status::OK(); +} + +Status FileScanNode::start_scanners() { + { + std::unique_lock l(_batch_queue_lock); + _num_running_scanners = 1; + } + _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size()); + return Status::OK(); +} + +Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + // check if CANCELLED. + if (state->is_cancelled()) { + std::unique_lock l(_batch_queue_lock); + if (update_status(Status::Cancelled("Cancelled"))) { + // Notify all scanners + _queue_writer_cond.notify_all(); + } + } + + if (_scan_finished.load()) { + *eos = true; + return Status::OK(); + } + + const int batch_size = _runtime_state->batch_size(); + while (true) { + std::shared_ptr scanner_block; + { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && _block_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::Cancelled("Cancelled"))) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_block_queue.empty()) { + scanner_block = _block_queue.front(); + _block_queue.pop_front(); + } + } + + // All scanner has been finished, and all cached batch has been read + if (!scanner_block) { + if (_mutable_block && !_mutable_block->empty()) { + *block = _mutable_block->to_block(); + reached_limit(block, eos); + LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit."; + } + _scan_finished.store(true); + *eos = true; + return Status::OK(); + } + // notify one scanner + _queue_writer_cond.notify_one(); + + if (UNLIKELY(!_mutable_block)) { + _mutable_block.reset(new MutableBlock(scanner_block->clone_empty())); + } + + if (_mutable_block->rows() + scanner_block->rows() < batch_size) { + // merge scanner_block into _mutable_block + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + continue; + } else { + if (_mutable_block->empty()) { + // directly use scanner_block + *block = *scanner_block; + } else { + // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next. + *block = _mutable_block->to_block(); + _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns()); + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + } + break; + } + } + + reached_limit(block, eos); + if (*eos) { + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + LOG(INFO) << "FileScanNode ReachedLimit."; + } else { + *eos = false; + } + + return Status::OK(); +} + +Status FileScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + _queue_reader_cond.notify_all(); + for (int i = 0; i < _scanner_threads.size(); ++i) { + _scanner_threads[i].join(); + } + + // Close + _batch_queue.clear(); + return ExecNode::close(state); +} + +Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) { + //create scanner object and open + std::unique_ptr scanner = create_scanner(scan_range, counter); + RETURN_IF_ERROR(scanner->open()); + bool scanner_eof = false; + while (!scanner_eof) { + RETURN_IF_CANCELLED(_runtime_state); + // If we have finished all works + if (_scan_finished.load() || !_process_status.ok()) { + return Status::OK(); + } + + std::shared_ptr block(new vectorized::Block()); + RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); + if (block->rows() == 0) { + continue; + } + auto old_rows = block->rows(); + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), + _tuple_desc->slots().size())); + counter->num_rows_unselected += old_rows - block->rows(); + if (block->rows() == 0) { + continue; + } + + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() && + // stop pushing more batch if + // 1. too many batches in queue, or + // 2. at least one batch in queue and memory exceed limit. + (_block_queue.size() >= _max_buffered_batches || + (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) { + _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); + } + // Process already set failed, so we just return OK + if (!_process_status.ok()) { + return Status::OK(); + } + // Scan already finished, just return + if (_scan_finished.load()) { + return Status::OK(); + } + // Runtime state is canceled, just return cancel + if (_runtime_state->is_cancelled()) { + return Status::Cancelled("Cancelled"); + } + // Queue size Must be smaller than _max_buffered_batches + _block_queue.push_back(block); + + // Notify reader to + _queue_reader_cond.notify_one(); + } + return Status::OK(); +} + +void FileScanNode::scanner_worker(int start_idx, int length) { + Thread::set_self_name("file_scanner"); + Status status = Status::OK(); + ScannerCounter counter; + for (int i = 0; i < length && status.ok(); ++i) { + const TFileScanRange& scan_range = + _scan_ranges[start_idx + i].scan_range.ext_scan_range.file_scan_range; + status = scanner_scan(scan_range, &counter); + if (!status.ok()) { + LOG(WARNING) << "Scanner[" << start_idx + i + << "] process failed. status=" << status.get_error_msg(); + } + } + + // Update stats + _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); + _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); + + // scanner is going to finish + { + std::lock_guard l(_batch_queue_lock); + if (!status.ok()) { + update_status(status); + } + // This scanner will finish + _num_running_scanners--; + } + _queue_reader_cond.notify_all(); + // If one scanner failed, others don't need scan any more + if (!status.ok()) { + _queue_writer_cond.notify_all(); + } +} + +std::unique_ptr FileScanNode::create_scanner(const TFileScanRange& scan_range, + ScannerCounter* counter) { + FileScanner* scan = nullptr; + switch (scan_range.ranges[0].format_type) { + case TFileFormatType::FORMAT_PARQUET: + scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + break; + case TFileFormatType::FORMAT_ORC: + scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + break; + + default: + scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + } + std::unique_ptr scanner(scan); + return scanner; +} + +// This function is called after plan node has been prepared. +Status FileScanNode::set_scan_ranges(const std::vector& scan_ranges) { + _scan_ranges = scan_ranges; + return Status::OK(); +} + +void FileScanNode::debug_string(int ident_level, std::stringstream* out) const { + (*out) << "FileScanNode"; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h new file mode 100644 index 0000000000..5106d654cc --- /dev/null +++ b/be/src/vec/exec/file_scan_node.h @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/scan_node.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/descriptors.h" +#include "vec/exec/file_scanner.h" +namespace doris { + +class RuntimeState; +class Status; + +namespace vectorized { +class FileScanNode final : public ScanNode { +public: + FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~FileScanNode() override = default; + + // Called after create this scan node + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + // Prepare partition infos & set up timer + Status prepare(RuntimeState* state) override; + + // Start file scan using ParquetScanner or OrcScanner. + Status open(RuntimeState* state) override; + + // Fill the next row batch by calling next() on the scanner, + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { + return Status::NotSupported("Not Implemented FileScanNode::get_next."); + } + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + // Close the scanner, and report errors. + Status close(RuntimeState* state) override; + + // No use + Status set_scan_ranges(const std::vector& scan_ranges) override; + +private: + // Write debug string of this into out. + void debug_string(int indentation_level, std::stringstream* out) const override; + + // Update process status to one failed status, + // NOTE: Must hold the mutex of this scan node + bool update_status(const Status& new_status) { + if (_process_status.ok()) { + _process_status = new_status; + return true; + } + return false; + } + + std::unique_ptr create_scanner(const TFileScanRange& scan_range, + ScannerCounter* counter); + + Status start_scanners(); + + void scanner_worker(int start_idx, int length); + // Scan one range + Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter); + + TupleId _tuple_id; + RuntimeState* _runtime_state; + TupleDescriptor* _tuple_desc; + std::map _slots_map; + std::vector _scan_ranges; + + std::mutex _batch_queue_lock; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::deque> _batch_queue; + + int _num_running_scanners; + + std::atomic _scan_finished; + + Status _process_status; + + std::vector _scanner_threads; + + int _max_buffered_batches; + + // The origin preceding filter exprs. + // These exprs will be converted to expr context + // in XXXScanner. + // Because the row descriptor used for these exprs is `src_row_desc`, + // which is initialized in XXXScanner. + std::vector _pre_filter_texprs; + + RuntimeProfile::Counter* _wait_scanner_timer; + + std::deque> _block_queue; + std::unique_ptr _mutable_block; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp new file mode 100644 index 0000000000..106b94cd73 --- /dev/null +++ b/be/src/vec/exec/file_scanner.cpp @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "file_scanner.h" + +#include + +#include + +#include "common/logging.h" +#include "common/utils.h" +#include "exec/exec_node.h" +#include "exec/text_converter.hpp" +#include "exprs/expr_context.h" +#include "runtime/descriptors.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" + +namespace doris::vectorized { + +FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : _state(state), + _params(params), + _ranges(ranges), + _next_range(0), + _counter(counter), +#if BE_TEST + _mem_tracker(new MemTracker()), +#else + _mem_tracker(MemTracker::create_tracker( + -1, state->query_type() == TQueryType::LOAD + ? "FileScanner:" + std::to_string(state->load_job_id()) + : "FileScanner:Select")), +#endif + _mem_pool(std::make_unique(_mem_tracker.get())), + _pre_filter_texprs(pre_filter_texprs), + _profile(profile), + _rows_read_counter(nullptr), + _read_timer(nullptr), + _scanner_eof(false) { + _text_converter.reset(new (std::nothrow) TextConverter('\\')); +} + +Status FileScanner::open() { + RETURN_IF_ERROR(_init_expr_ctxes()); + + _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); + + return Status::OK(); +} + +Status FileScanner::_init_expr_ctxes() { + const TupleDescriptor* src_tuple_desc = + _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); + if (src_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; + return Status::InternalError(ss.str()); + } + DCHECK(!_ranges.empty()); + + std::map _full_src_index_map; + std::map _full_src_slot_map; + int index = 0; + for (const auto& slot_desc : src_tuple_desc->slots()) { + _full_src_slot_map.emplace(slot_desc->id(), slot_desc); + _full_src_index_map.emplace(slot_desc->id(), index++); + } + + _num_of_columns_from_file = _params.num_of_columns_from_file; + for (const auto& slot_info : _params.required_slots) { + auto slot_id = slot_info.slot_id; + auto it = _full_src_slot_map.find(slot_id); + if (it == std::end(_full_src_slot_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_id; + return Status::InternalError(ss.str()); + } + _required_slot_descs.emplace_back(it->second); + if (slot_info.is_file_slot) { + _file_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _file_slot_index_map.emplace(slot_id, iti->second); + } else { + _partition_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); + } + } + + _row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector({_params.src_tuple_id}), + std::vector({false}))); + + // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor + if (!_pre_filter_texprs.empty()) { + // for vectorized, preceding filter exprs should be compounded to one passed from fe. + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + } + + return Status::OK(); +} + +void FileScanner::close() { + if (_vpre_filter_ctx_ptr) { + (*_vpre_filter_ctx_ptr)->close(_state); + } +} + +Status FileScanner::init_block(vectorized::Block* block) { + (*block).clear(); + _rows = 0; + for (const auto& slot_desc : _required_slot_descs) { + if (slot_desc == nullptr) { + continue; + } + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), + is_nullable); + if (data_type == nullptr) { + return Status::NotSupported( + fmt::format("Not support type for column:{}", slot_desc->col_name())); + } + MutableColumnPtr data_column = data_type->create_column(); + (*block).insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + return Status::OK(); +} + +Status FileScanner::_filter_block(vectorized::Block* _block) { + auto origin_column_num = (*_block).columns(); + // filter block + auto old_rows = (*_block).rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, _block, + origin_column_num)); + _counter->num_rows_unselected += old_rows - (*_block).rows(); + return Status::OK(); +} + +Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) { + *eof = _scanner_eof; + RETURN_IF_ERROR(_fill_columns_from_path(_block)); + if (LIKELY(_rows > 0)) { + RETURN_IF_ERROR(_filter_block(_block)); + } + + return Status::OK(); +} + +Status FileScanner::_fill_columns_from_path(vectorized::Block* _block) { + const TFileRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { + size_t rows = _rows; + + for (const auto& slot_desc : _partition_slot_descs) { + if (slot_desc == nullptr) continue; + auto it = _partition_slot_index_map.find(slot_desc->id()); + if (it == std::end(_partition_slot_index_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); + return Status::InternalError(ss.str()); + } + const std::string& column_from_path = range.columns_from_path[it->second]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(col_ptr); + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + + for (size_t j = 0; j < rows; ++j) { + _text_converter->write_vec_column(slot_desc, col_ptr, + const_cast(column_from_path.c_str()), + column_from_path.size(), true, false); + } + } + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h new file mode 100644 index 0000000000..cb21b876d5 --- /dev/null +++ b/be/src/vec/exec/file_scanner.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/text_converter.h" +#include "exprs/expr.h" +#include "util/runtime_profile.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { +class FileScanner { +public: + FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + virtual ~FileScanner() = default; + + // Open this scanner, will initialize information need to + virtual Status open(); + + // Get next block + virtual Status get_next(vectorized::Block* block, bool* eof) { + return Status::NotSupported("Not Implemented get block"); + } + + // Close this scanner + virtual void close() = 0; + +protected: + Status finalize_block(vectorized::Block* dest_block, bool* eof); + Status init_block(vectorized::Block* block); + + std::unique_ptr _text_converter; + + RuntimeState* _state; + const TFileScanRangeParams& _params; + + const std::vector& _ranges; + int _next_range; + // used for process stat + ScannerCounter* _counter; + + // Used for constructing tuple + std::vector _required_slot_descs; + std::vector _file_slot_descs; + std::map _file_slot_index_map; + std::vector _partition_slot_descs; + std::map _partition_slot_index_map; + + std::unique_ptr _row_desc; + + std::shared_ptr _mem_tracker; + // Mem pool used to allocate _src_tuple and _src_tuple_row + std::unique_ptr _mem_pool; + + const std::vector _pre_filter_texprs; + + // Profile + RuntimeProfile* _profile; + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + + bool _scanner_eof = false; + int _rows = 0; + + std::unique_ptr _vpre_filter_ctx_ptr; + int _num_of_columns_from_file; + +private: + Status _init_expr_ctxes(); + Status _filter_block(vectorized::Block* output_block); + Status _fill_columns_from_path(vectorized::Block* output_block); +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp new file mode 100644 index 0000000000..03e2dd9275 --- /dev/null +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_text_scanner.h" + +#include +#include + +#include + +#include "exec/exec_node.h" +#include "exec/plain_text_line_reader.h" +#include "exec/text_converter.h" +#include "exec/text_converter.hpp" +#include "exprs/expr_context.h" +#include "io/buffered_reader.h" +#include "io/hdfs_reader_writer.h" +#include "util/types.h" +#include "util/utf8_check.h" + +namespace doris::vectorized { + +FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_line_reader_eof(false), + _skip_lines(0), + _success(false) + +{ + if (params.__isset.text_params) { + auto text_params = params.text_params; + if (text_params.__isset.column_separator_str) { + _value_separator = text_params.column_separator_str; + _value_separator_length = _value_separator.length(); + } + if (text_params.__isset.line_delimiter_str) { + _line_delimiter = text_params.line_delimiter_str; + _line_delimiter_length = _line_delimiter.length(); + } + } +} + +FileTextScanner::~FileTextScanner() { + close(); +} + +Status FileTextScanner::open() { + RETURN_IF_ERROR(FileScanner::open()); + + if (_ranges.empty()) { + return Status::OK(); + } + _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); + return Status::OK(); +} + +void FileTextScanner::close() { + FileScanner::close(); + + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } +} + +Status FileTextScanner::get_next(Block* block, bool* eof) { + SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(init_block(block)); + + const int batch_size = _state->batch_size(); + + while (_rows < batch_size && !_scanner_eof) { + if (_cur_line_reader == nullptr || _cur_line_reader_eof) { + RETURN_IF_ERROR(_open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + } + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); + std::unique_ptr u_ptr; + u_ptr.reset(ptr); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + { + COUNTER_UPDATE(_rows_read_counter, 1); + RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); + } + } + + return finalize_block(block, eof); +} + +Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) { + RETURN_IF_ERROR(_line_split_to_values(line)); + if (!_success) { + // If not success, which means we met an invalid row, return. + return Status::OK(); + } + + for (int i = 0; i < _split_values.size(); ++i) { + auto slot_desc = _file_slot_descs[i]; + const Slice& value = _split_values[i]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(col_ptr); + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + + if (value.size == 2 && value.data[0] == '\\' && value[1] == 'N') { + col_ptr->insert_default(); + continue; + } + _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false); + } + _rows++; + return Status::OK(); +} + +Status FileTextScanner::_open_next_reader() { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_line_reader()); + _next_range++; + + return Status::OK(); +} + +Status FileTextScanner::_open_file_reader() { + const TFileRangeDesc& range = _ranges[_next_range]; + + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + range.start_offset, &hdfs_reader)); + _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + return _cur_file_reader->open(); +} + +Status FileTextScanner::_open_line_reader() { + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } + + const TFileRangeDesc& range = _ranges[_next_range]; + int64_t size = range.size; + if (range.start_offset != 0) { + if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { + std::stringstream ss; + ss << "For now we do not support split compressed file"; + return Status::InternalError(ss.str()); + } + size += 1; + // not first range will always skip one line + _skip_lines = 1; + } + + // open line reader + switch (range.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, + _line_delimiter, _line_delimiter_length); + break; + default: { + std::stringstream ss; + ss << "Unknown format type, cannot init line reader, type=" << range.format_type; + return Status::InternalError(ss.str()); + } + } + + _cur_line_reader_eof = false; + + return Status::OK(); +} + +Status FileTextScanner::_line_split_to_values(const Slice& line) { + if (!validate_utf8(line.data, line.size)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + []() -> std::string { return "Unable to display"; }, + []() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", "Unable to display"); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + _counter->num_rows_filtered++; + _success = false; + return Status::OK(); + } + + RETURN_IF_ERROR(_split_line(line)); + + _success = true; + return Status::OK(); +} + +Status FileTextScanner::_split_line(const Slice& line) { + _split_values.clear(); + std::vector tmp_split_values; + tmp_split_values.reserve(_num_of_columns_from_file); + + const char* value = line.data; + size_t start = 0; // point to the start pos of next col value. + size_t curpos = 0; // point to the start pos of separator matching sequence. + size_t p1 = 0; // point to the current pos of separator matching sequence. + size_t non_space = 0; // point to the last pos of non_space charactor. + + // Separator: AAAA + // + // p1 + // ▼ + // AAAA + // 1000AAAA2000AAAA + // ▲ ▲ + // Start │ + // curpos + + while (curpos < line.size) { + if (*(value + curpos + p1) != _value_separator[p1]) { + // Not match, move forward: + curpos += (p1 == 0 ? 1 : p1); + p1 = 0; + } else { + p1++; + if (p1 == _value_separator_length) { + // Match a separator + non_space = curpos; + // Trim tailing spaces. Be consistent with hive and trino's behavior. + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + tmp_split_values.emplace_back(value + start, non_space - start); + start = curpos + _value_separator_length; + curpos = start; + p1 = 0; + non_space = 0; + } + } + } + + CHECK(curpos == line.size) << curpos << " vs " << line.size; + non_space = curpos; + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + + tmp_split_values.emplace_back(value + start, non_space - start); + for (const auto& slot : _file_slot_descs) { + auto it = _file_slot_index_map.find(slot->id()); + if (it == std::end(_file_slot_index_map)) { + std::stringstream ss; + ss << "Unknown _file_slot_index_map, slot_id=" << slot->id(); + return Status::InternalError(ss.str()); + } + int index = it->second; + CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file; + _split_values.emplace_back(tmp_split_values[index]); + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.h b/be/src/vec/exec/file_text_scanner.h new file mode 100644 index 0000000000..fc3ba6709c --- /dev/null +++ b/be/src/vec/exec/file_text_scanner.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/consts.h" +#include "exec/base_scanner.h" +#include "exec/decompressor.h" +#include "exec/line_reader.h" +#include "exec/plain_binary_line_reader.h" +#include "exec/plain_text_line_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "io/file_factory.h" +#include "io/file_reader.h" +#include "vec/exec/file_scanner.h" + +namespace doris::vectorized { +class FileTextScanner final : public FileScanner { +public: + FileTextScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~FileTextScanner() override; + + Status open() override; + + Status get_next(Block* block, bool* eof) override; + void close() override; + +private: + Status _fill_file_columns(const Slice& line, vectorized::Block* _block); + Status _open_next_reader(); + Status _open_file_reader(); + Status _open_line_reader(); + Status _line_split_to_values(const Slice& line); + Status _split_line(const Slice& line); + // Reader + std::shared_ptr _cur_file_reader; + LineReader* _cur_line_reader; + bool _cur_line_reader_eof; + + // When we fetch range start from 0, header_type="csv_with_names" skip first line + // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line + // When we fetch range doesn't start + int _skip_lines; + std::vector _split_values; + std::string _value_separator; + std::string _line_delimiter; + int _value_separator_length; + int _line_delimiter_length; + + bool _success; +}; +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index ed7ba00dd6..55b66593e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -20,7 +20,7 @@ package org.apache.doris.analysis; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.IdGenerator; import org.apache.doris.thrift.TDescriptorTable; @@ -46,7 +46,7 @@ public class DescriptorTable { private final HashMap tupleDescs = new HashMap(); // List of referenced tables with no associated TupleDescriptor to ship to the BE. // For example, the output table of an insert query. - private final List referencedTables = new ArrayList
(); + private final List referencedTables = new ArrayList(); private final IdGenerator tupleIdGenerator = TupleId.createGenerator(); private final IdGenerator slotIdGenerator = SlotId.createGenerator(); private final HashMap slotDescs = Maps.newHashMap(); @@ -121,7 +121,7 @@ public class DescriptorTable { return tupleDescs.values(); } - public void addReferencedTable(Table table) { + public void addReferencedTable(TableIf table) { referencedTables.add(table); } @@ -151,7 +151,7 @@ public class DescriptorTable { public TDescriptorTable toThrift() { TDescriptorTable result = new TDescriptorTable(); - HashSet
referencedTbls = Sets.newHashSet(); + HashSet referencedTbls = Sets.newHashSet(); for (TupleDescriptor tupleD : tupleDescs.values()) { // inline view of a non-constant select has a non-materialized tuple descriptor // in the descriptor table just for type checking, which we need to skip @@ -161,7 +161,7 @@ public class DescriptorTable { // but its table has no id if (tupleD.getTable() != null && tupleD.getTable().getId() >= 0) { - referencedTbls.add((Table) tupleD.getTable()); + referencedTbls.add(tupleD.getTable()); } for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) { result.addToSlotDescriptors(slotD.toThrift()); @@ -169,11 +169,9 @@ public class DescriptorTable { } } - for (Table table : referencedTables) { - referencedTbls.add(table); - } + referencedTbls.addAll(referencedTables); - for (Table tbl : referencedTbls) { + for (TableIf tbl : referencedTbls) { result.addToTableDescriptors(tbl.toThrift()); } return result; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index 0bb58a4778..2d97c54c41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -22,7 +22,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.TableAliasGenerator; @@ -1186,7 +1186,7 @@ public class StmtRewriter { } continue; } - Table table = tableRef.getTable(); + TableIf table = tableRef.getTable(); String dbName = tableRef.getName().getDb(); if (dbName == null) { dbName = analyzer.getDefaultDb(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 107a9a3637..01c816a920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -21,7 +21,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -300,8 +300,8 @@ public class TableRef implements ParseNode, Writable { return !correlatedTupleIds.isEmpty(); } - public Table getTable() { - return (Table) desc.getTable(); + public TableIf getTable() { + return desc.getTable(); } public void setUsingClause(List colNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java index 96aaa615cc..b75bb10af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java @@ -22,7 +22,6 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.thrift.TTupleDescriptor; @@ -353,7 +352,7 @@ public class TupleDescriptor { if (slotDescriptor.getColumn() != null) { TupleDescriptor parent = slotDescriptor.getParent(); Preconditions.checkState(parent != null); - Table table = (Table) parent.getTable(); + TableIf table = parent.getTable(); Preconditions.checkState(table != null); Long tableId = table.getId(); Set columnNames = tableIdToColumnNames.get(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 46f1ea0b63..80d311fe54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -698,6 +698,10 @@ public class HiveMetaStoreClientHelper { return Type.DATE; case "timestamp": return Type.DATETIME; + case "float": + return Type.FLOAT; + case "double": + return Type.DOUBLE; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d2f1427e5c..b0066b28aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.TTableDescriptor; import java.util.List; import java.util.concurrent.TimeUnit; @@ -92,11 +93,14 @@ public interface TableIf { String getComment(boolean escapeQuota); + public TTableDescriptor toThrift(); + /** * Doris table type. */ public enum TableType { - MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, TABLE_VALUED_FUNCTION; + MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, + TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE; public String toEngineName() { switch (this) { @@ -122,6 +126,8 @@ public interface TableIf { return "Hudi"; case TABLE_VALUED_FUNCTION: return "Table_Valued_Function"; + case HMS_EXTERNAL_TABLE: + return "hms"; default: return null; } @@ -143,6 +149,7 @@ public interface TableIf { case HIVE: case HUDI: case TABLE_VALUED_FUNCTION: + case HMS_EXTERNAL_TABLE: return "EXTERNAL TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index a834e5508e..f43bd8edf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.TTableDescriptor; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; @@ -249,5 +250,10 @@ public class ExternalTable implements TableIf { @Override public String getComment(boolean escapeQuota) { return ""; + + } + + public TTableDescriptor toThrift() { + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 9c6e9b12fe..fd909ac514 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -21,11 +21,16 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.THiveTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -38,6 +43,13 @@ public class HMSExternalTable extends ExternalTable { private final String metastoreUri; private final String dbName; private org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + private DLAType dlaType = null; + + public enum DLAType { + HIVE, + HUDI, + ICEBERG + } /** * Create hive metastore external table. @@ -51,6 +63,7 @@ public class HMSExternalTable extends ExternalTable { super(id, name); this.dbName = dbName; this.metastoreUri = uri; + this.type = TableType.HMS_EXTERNAL_TABLE; init(); } @@ -58,11 +71,11 @@ public class HMSExternalTable extends ExternalTable { getRemoteTable(); if (remoteTable.getParameters().containsKey("table_type") && remoteTable.getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) { - type = TableType.ICEBERG; + dlaType = DLAType.ICEBERG; } else if (remoteTable.getSd().getInputFormat().toLowerCase().contains("hoodie")) { - type = TableType.HUDI; + dlaType = DLAType.HUDI; } else { - type = TableType.HIVE; + dlaType = DLAType.HIVE; } } @@ -92,6 +105,7 @@ public class HMSExternalTable extends ExternalTable { if (fullSchema == null) { synchronized (this) { if (fullSchema == null) { + fullSchema = new ArrayList<>(); try { for (FieldSchema field : HiveMetaStoreClientHelper.getSchema(dbName, name, metastoreUri)) { fullSchema.add(new Column(field.getName(), @@ -191,4 +205,24 @@ public class HMSExternalTable extends ExternalTable { public String getDbName() { return dbName; } + + /** + * get the dla type for scan node to get right information. + */ + public DLAType getDlaType() { + return dlaType; + } + + @Override + public TTableDescriptor toThrift() { + THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + fullSchema.size(), 0, getName(), ""); + tTableDescriptor.setHiveTable(tHiveTable); + return tTableDescriptor; + } + + public String getMetastoreUri() { + return metastoreUri; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 60dd5a8b33..8367bc8b85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -49,7 +49,6 @@ import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; -import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TBrokerReadResponse; import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; @@ -91,25 +90,26 @@ public class BrokerUtil { public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; - public static void generateHdfsParam(Map properties, TBrokerRangeDesc rangeDesc) { - rangeDesc.setHdfsParams(new THdfsParams()); - rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>()); + public static THdfsParams generateHdfsParam(Map properties) { + THdfsParams tHdfsParams = new THdfsParams(); + tHdfsParams.setHdfsConf(new ArrayList<>()); for (Map.Entry property : properties.entrySet()) { if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { - rangeDesc.hdfs_params.setFsName(property.getValue()); + tHdfsParams.setFsName(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) { - rangeDesc.hdfs_params.setUser(property.getValue()); + tHdfsParams.setUser(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) { - rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue()); + tHdfsParams.setHdfsKerberosPrincipal(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) { - rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue()); + tHdfsParams.setHdfsKerberosKeytab(property.getValue()); } else { THdfsConf hdfsConf = new THdfsConf(); hdfsConf.setKey(property.getKey()); hdfsConf.setValue(property.getValue()); - rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf); + tHdfsParams.hdfs_conf.add(hdfsConf); } } + return tHdfsParams; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java index 192d25b6a4..116efe65e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java @@ -117,6 +117,10 @@ public class DataSourceMgr implements Writable { return dbNames; } + public DataSourceIf getExternalDatasource(String name) { + return nameToCatalogs.get(name); + } + private void writeLock() { lock.writeLock().lock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java index b7097ff4cf..4d1a84c4b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java @@ -48,8 +48,9 @@ public class HMSExternalDataSource extends ExternalDataSource { //Cache of db name to db id. private ConcurrentHashMap dbNameToId = new ConcurrentHashMap(); - private AtomicLong nextId = new AtomicLong(0); + private static final AtomicLong nextId = new AtomicLong(0); + private boolean initialized = false; protected String hiveMetastoreUris; protected HiveMetaStoreClient client; @@ -57,34 +58,21 @@ public class HMSExternalDataSource extends ExternalDataSource { * Default constructor for HMSExternalDataSource. */ public HMSExternalDataSource(String name, Map props) { - setName(name); - getDsProperty().setProperties(props); - setType("hms"); - } - - /** - * Hive metastore data source implementation. - * - * @param hiveMetastoreUris e.g. thrift://127.0.0.1:9083 - */ - public HMSExternalDataSource(long id, String name, String type, DataSourceProperty dsProperty, - String hiveMetastoreUris) throws DdlException { - this.id = id; + this.id = nextId.incrementAndGet(); this.name = name; - this.type = type; - this.dsProperty = dsProperty; - this.hiveMetastoreUris = hiveMetastoreUris; - init(); + this.type = "hms"; + this.dsProperty = new DataSourceProperty(); + this.dsProperty.setProperties(props); + this.hiveMetastoreUris = props.getOrDefault("hive.metastore.uris", "thrift://127.0.0.1:9083"); } - private void init() throws DdlException { + private void init() { HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreUris); try { client = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage()); - throw new DdlException("Create HMSExternalDataSource failed.", e); } List allDatabases; try { @@ -102,8 +90,20 @@ public class HMSExternalDataSource extends ExternalDataSource { } } + /** + * Datasource can't be init when creating because the external datasource may depend on third system. + * So you have to make sure the client of third system is initialized before any method was called. + */ + private synchronized void makeSureInitialized() { + if (!initialized) { + init(); + initialized = true; + } + } + @Override public List listDatabaseNames(SessionContext ctx) { + makeSureInitialized(); try { List allDatabases = client.getAllDatabases(); // Update the db name to id map. @@ -119,6 +119,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public List listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); try { return client.getAllTables(dbName); } catch (MetaException e) { @@ -129,6 +130,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); try { return client.tableExists(dbName, tblName); } catch (TException e) { @@ -140,6 +142,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Nullable @Override public ExternalDatabase getDbNullable(String dbName) { + makeSureInitialized(); try { client.getDatabase(dbName); } catch (TException e) { @@ -156,6 +159,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Nullable @Override public ExternalDatabase getDbNullable(long dbId) { + makeSureInitialized(); for (Map.Entry entry : dbNameToId.entrySet()) { if (entry.getValue() == dbId) { return new HMSExternalDatabase(this, dbId, entry.getKey(), hiveMetastoreUris); @@ -230,6 +234,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public List getDbIds() { + makeSureInitialized(); return Lists.newArrayList(dbNameToId.values()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 16f86de29f..a9a640e38d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -605,7 +605,8 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setHeaderType(headerType); // set hdfs params for hdfs file type. if (brokerDesc.getFileType() == TFileType.FILE_HDFS) { - BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties()); + rangeDesc.setHdfsParams(tHdfsParams); } return rangeDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index 909f5cbe6d..cb57f1ec7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -292,11 +292,9 @@ public class HudiScanNode extends BrokerScanNode { return; } - THdfsParams hdfsParams = new THdfsParams(); String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); - hdfsParams.setFsName(fsName); Log.debug("Hudi path's host is " + fsName); TFileFormatType fileFormatType = null; @@ -319,7 +317,7 @@ public class HudiScanNode extends BrokerScanNode { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, fileFormatType, partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc); - rangeDesc.setHdfsParams(hdfsParams); + rangeDesc.getHdfsParams().setFsName(fsName); rangeDesc.setReadByColumnDef(true); curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); @@ -350,7 +348,8 @@ public class HudiScanNode extends BrokerScanNode { // set hdfs params for hdfs file type. switch (brokerDesc.getFileType()) { case FILE_HDFS: - BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties()); + rangeDesc.setHdfsParams(tHdfsParams); break; default: break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 3e9a3ae137..748d33a0b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -63,6 +63,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.ExternalFileScanNode; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -1726,6 +1727,9 @@ public class SingleNodePlanner { scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction()); break; + case HMS_EXTERNAL_TABLE: + scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HMS_FILE_SCAN_NODE"); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 0c6e66e2d2..7b77dd457f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -19,17 +19,11 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -42,12 +36,15 @@ import org.apache.doris.resource.Tag; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; -import org.apache.doris.thrift.TBrokerRangeDesc; -import org.apache.doris.thrift.TBrokerScanNode; -import org.apache.doris.thrift.TBrokerScanRange; -import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanNode; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; @@ -61,7 +58,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; @@ -69,7 +65,6 @@ import org.apache.logging.log4j.Logger; import org.mortbay.log.Log; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -88,9 +83,8 @@ public class ExternalFileScanNode extends ExternalScanNode { private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; private static class ParamCreateContext { - public TBrokerScanRangeParams params; + public TFileScanRangeParams params; public TupleDescriptor srcTupleDescriptor; - public Map slotDescByName; } private static class BackendPolicy { @@ -135,16 +129,12 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - private enum DLAType { - HIVE, - HUDI, - ICE_BERG - } - private final BackendPolicy backendPolicy = new BackendPolicy(); private final ParamCreateContext context = new ParamCreateContext(); + private final List partitionKeys = new ArrayList<>(); + private List scanRangeLocations; private final HMSExternalTable hmsTable; @@ -157,17 +147,17 @@ public class ExternalFileScanNode extends ExternalScanNode { public ExternalFileScanNode( PlanNodeId id, TupleDescriptor desc, - String planNodeName) throws MetaNotFoundException { - super(id, desc, planNodeName, StatisticalType.BROKER_SCAN_NODE); + String planNodeName) { - this.hmsTable = (HMSExternalTable) desc.getTable(); + super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE); - DLAType type = getDLAType(); - switch (type) { + this.hmsTable = (HMSExternalTable) this.desc.getTable(); + + switch (this.hmsTable.getDlaType()) { case HUDI: this.scanProvider = new ExternalHudiScanProvider(this.hmsTable); break; - case ICE_BERG: + case ICEBERG: this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable); break; case HIVE: @@ -178,61 +168,63 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - private DLAType getDLAType() throws MetaNotFoundException { - if (hmsTable.getRemoteTable().getParameters().containsKey("table_type") - && hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) { - return DLAType.ICE_BERG; - } else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) { - return DLAType.HUDI; - } else { - return DLAType.HIVE; - } - } - @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); backendPolicy.init(); - initContext(context); + initContext(); } - private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException { + private void initContext() throws DdlException, MetaNotFoundException { context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); - context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - context.params = new TBrokerScanRangeParams(); + context.params = new TFileScanRangeParams(); if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { Map serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim"); String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim"); - context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]); - context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]); - context.params.setColumnSeparatorStr(columnSeparator); - context.params.setLineDelimiterStr(lineDelimiter); - context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length); - context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length); + + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setLineDelimiterStr(lineDelimiter); + textParams.setColumnSeparatorStr(columnSeparator); + + context.params.setTextParams(textParams); } - Map slotDescByName = Maps.newHashMap(); + context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); + // Need re compute memory layout after set some slot descriptor to nullable + context.srcTupleDescriptor.computeStatAndMemLayout(); + + Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); List columns = hmsTable.getBaseSchema(false); for (Column column : columns) { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setType(column.getType()); slotDesc.setIsMaterialized(true); slotDesc.setIsNullable(true); - slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); - context.params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDesc.setColumn(new Column(column)); slotDescByName.put(column.getName(), slotDesc); } - context.slotDescByName = slotDescByName; + + // Hive table must extract partition value from path and hudi/iceberg table keep partition field in file. + partitionKeys.addAll(scanProvider.getPathPartitionKeys()); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slotId); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + + context.params.addToRequiredSlots(slotInfo); + } } @Override public void finalize(Analyzer analyzer) throws UserException { try { - finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor); buildScanRange(); } catch (IOException e) { LOG.error("Finalize failed.", e); @@ -248,50 +240,40 @@ public class ExternalFileScanNode extends ExternalScanNode { return; } - THdfsParams hdfsParams = new THdfsParams(); String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); - hdfsParams.setFsName(fsName); - List partitionKeys = new ArrayList<>(); - for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) { - partitionKeys.add(fieldSchema.getName()); - } + // Todo: now every split will assign one scan range, we can merge them for optimize. for (InputSplit split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TScanRangeLocations curLocations = newLocations(context.params); List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), partitionKeys); - int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath, - numberOfColumnsFromFile); - rangeDesc.setHdfsParams(hdfsParams); - rangeDesc.setReadByColumnDef(true); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); + rangeDesc.getHdfsParams().setFsName(fsName); - curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); - // Put the last file - if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { - scanRangeLocations.add(curLocations); - } + scanRangeLocations.add(curLocations); } } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params) { - // Generate on broker scan range - TBrokerScanRange brokerScanRange = new TBrokerScanRange(); - brokerScanRange.setParams(params); - brokerScanRange.setBrokerAddresses(new ArrayList<>()); + private TScanRangeLocations newLocations(TFileScanRangeParams params) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); TScanRange scanRange = new TScanRange(); - scanRange.setBrokerScanRange(brokerScanRange); + scanRange.setExtScanRange(externalScanRange); // Locations TScanRangeLocations locations = new TScanRangeLocations(); @@ -306,66 +288,24 @@ public class ExternalFileScanNode extends ExternalScanNode { return locations; } - private TBrokerRangeDesc createBrokerRangeDesc( + private TFileRangeDesc createFileRangeDesc( FileSplit fileSplit, - List columnsFromPath, - int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException { - TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); + List columnsFromPath) throws DdlException, MetaNotFoundException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setFileType(scanProvider.getTableFileType()); rangeDesc.setFormatType(scanProvider.getTableFormatType()); rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - rangeDesc.setSplittable(true); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); - rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); rangeDesc.setColumnsFromPath(columnsFromPath); // set hdfs params for hdfs file type. if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); + rangeDesc.setHdfsParams(tHdfsParams); } return rangeDesc; } - private void finalizeParams( - Map slotDescByName, - TBrokerScanRangeParams params, - TupleDescriptor srcTupleDesc) throws UserException { - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - for (SlotDescriptor destSlotDesc : desc.getSlots()) { - Expr expr; - SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); - if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); - // If dest is allow null, we set source to nullable - if (destSlotDesc.getColumn().isAllowNull()) { - srcSlotDesc.setIsNullable(true); - } - expr = new SlotRef(srcSlotDesc); - } else { - Column column = destSlotDesc.getColumn(); - if (column.getDefaultValue() != null) { - expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - throw new AnalysisException("column has no source field, column=" + column.getName()); - } - } - } - - expr = castToSlot(destSlotDesc, expr); - params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); - } - params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); - params.setDestTupleId(desc.getId().asInt()); - params.setStrictMode(false); - params.setSrcTupleId(srcTupleDesc.getId().asInt()); - - // Need re compute memory layout after set some slot descriptor to nullable - srcTupleDesc.computeStatAndMemLayout(); - } - @Override public int getNumInstances() { return scanRangeLocations.size(); @@ -373,18 +313,19 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override protected void toThrift(TPlanNode planNode) { - planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE); - TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); + planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); + TFileScanNode fileScanNode = new TFileScanNode(); + fileScanNode.setTupleId(desc.getId().asInt()); if (!preFilterConjuncts.isEmpty()) { if (Config.enable_vectorized_load && vpreFilterConjunct != null) { - brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); + fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); } else { for (Expr e : preFilterConjuncts) { - brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + fileScanNode.addToPreFilterExprs(e.treeToThrift()); } } } - planNode.setBrokerScanNode(brokerScanNode); + planNode.setFileScanNode(fileScanNode); } @Override @@ -394,15 +335,8 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - String url; - try { - url = scanProvider.getMetaStoreUrl(); - } catch (MetaNotFoundException e) { - LOG.warn("Can't get url error", e); - url = "Can't get url error."; - } return prefix + "DATABASE: " + hmsTable.getDbName() + "\n" + prefix + "TABLE: " + hmsTable.getName() + "\n" - + prefix + "HIVE URL: " + url + "\n"; + + prefix + "HIVE URL: " + scanProvider.getMetaStoreUrl() + "\n"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java index 2fc626ab70..ebdd7a768a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java @@ -39,11 +39,13 @@ public interface ExternalFileScanProvider { TFileType getTableFileType(); - String getMetaStoreUrl() throws MetaNotFoundException; + String getMetaStoreUrl(); InputSplit[] getSplits(List exprs) throws IOException, UserException; Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; Map getTableProperties() throws MetaNotFoundException; + + List getPathPartitionKeys() throws DdlException, MetaNotFoundException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java index 979a5a29d3..626870d43e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -28,7 +28,6 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -77,8 +76,8 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { } @Override - public String getMetaStoreUrl() throws MetaNotFoundException { - return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name()); + public String getMetaStoreUrl() { + return hmsTable.getMetastoreUri(); } @Override @@ -146,4 +145,9 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { public Map getTableProperties() throws MetaNotFoundException { return hmsTable.getRemoteTable().getParameters(); } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java index 2951c1fb7a..1638d9429c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java @@ -19,8 +19,12 @@ package org.apache.doris.planner.external; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.thrift.TFileFormatType; +import java.util.Collections; +import java.util.List; + /** * A file scan provider for hudi. * HudiProvier is extended with hive since they both use input format interface to get the spilt. @@ -35,4 +39,9 @@ public class ExternalHudiScanProvider extends ExternalHiveScanProvider { public TFileFormatType getTableFormatType() throws DdlException { return TFileFormatType.FORMAT_PARQUET; } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return Collections.emptyList(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java index 28ba3dad0a..6f321fc4b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java @@ -18,16 +18,15 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.IcebergProperty; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.external.iceberg.HiveCatalog; import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -39,7 +38,10 @@ import org.apache.iceberg.expressions.Expression; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * A file scan provider for iceberg. @@ -98,8 +100,20 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { } private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { - HiveCatalog hiveCatalog = new HiveCatalog(); - hiveCatalog.initialize(new IcebergProperty(getTableProperties())); + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = new Configuration(); + hiveCatalog.setConf(conf); + // initialize hive catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("hive.metastore.uris", getMetaStoreUrl()); + catalogProperties.put("uri", getMetaStoreUrl()); + hiveCatalog.initialize("hive", catalogProperties); + return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName())); } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return Collections.emptyList(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index cc806f7b3a..4b23337038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -45,4 +45,5 @@ public enum StatisticalType { TABLE_FUNCTION_NODE, UNION_NODE, TABLE_VALUED_FUNCTION_NODE, + FILE_SCAN_NODE, } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4dcddcfd1e..1b627f4879 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -22,6 +22,7 @@ include "Exprs.thrift" include "Types.thrift" include "Opcodes.thrift" include "Partitions.thrift" +include "Descriptors.thrift" enum TPlanNodeType { OLAP_SCAN_NODE, @@ -53,6 +54,7 @@ enum TPlanNodeType { ODBC_SCAN_NODE, TABLE_FUNCTION_NODE, TABLE_VALUED_FUNCTION_SCAN_NODE, + FILE_SCAN_NODE, } // phases of an execution node @@ -213,8 +215,46 @@ struct TEsScanRange { 4: required i32 shard_id } -struct TFileScanRange { +struct TFileTextScanRangeParams { + 1: optional string column_separator_str; + 2: optional string line_delimiter_str; +} +struct TFileScanSlotInfo { + 1: optional Types.TSlotId slot_id; + 2: optional bool is_file_slot; +} + +struct TFileScanRangeParams { + // use src_tuple_id to get all slots from src table include both file slot and partition slot. + 1: optional Types.TTupleId src_tuple_id; + // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot + 2: optional i32 num_of_columns_from_file; + // all selected slots which may compose from file and partiton value. + 3: optional list required_slots; + + 4: optional TFileTextScanRangeParams text_params; +} + +struct TFileRangeDesc { + 1: optional Types.TFileType file_type; + 2: optional TFileFormatType format_type; + // Path of this range + 3: optional string path; + // Offset of this file start + 4: optional i64 start_offset; + // Size of this range, if size = -1, this means that will read to the end of file + 5: optional i64 size; + // columns parsed from file path should be after the columns read from file + 6: optional list columns_from_path; + + 7: optional THdfsParams hdfs_params; +} + +// HDFS file scan range +struct TFileScanRange { + 1: optional list ranges + 2: optional TFileScanRangeParams params } // Scan range for external datasource, such as file on hdfs, es datanode, etc. @@ -281,6 +321,11 @@ struct TBrokerScanNode { 4: optional list pre_filter_exprs } +struct TFileScanNode { + 1: optional Types.TTupleId tuple_id + 2: optional list pre_filter_exprs +} + struct TEsScanNode { 1: required Types.TTupleId tuple_id 2: optional map properties @@ -830,6 +875,9 @@ struct TPlanNode { // output column 42: optional list output_slot_ids 43: optional TTableValuedFunctionScanNode table_valued_func_scan_node + + // file scan node + 44: optional TFileScanNode file_scan_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first