diff --git a/.gitignore b/.gitignore index 80e2ae605a..03a2c7339f 100644 --- a/.gitignore +++ b/.gitignore @@ -71,6 +71,7 @@ fe/fe-core/src/main/resources/static/ # BE be/build*/ +be/cmake-build*/ be/ut_build*/ be/src/gen_cpp/*.[cc, cpp, h] be/src/gen_cpp/opcode diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index b2de6bc180..54d2fdbea6 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -60,6 +60,7 @@ #include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/core/block.h" +#include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" @@ -393,6 +394,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::TABLE_FUNCTION_NODE: case TPlanNodeType::BROKER_SCAN_NODE: case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: + case TPlanNodeType::FILE_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -555,6 +557,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); + case TPlanNodeType::FILE_SCAN_NODE: + *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + + return Status::OK(); + case TPlanNodeType::REPEAT_NODE: if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::VRepeatNode(pool, tnode, descs)); @@ -664,6 +671,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); } void ExecNode::try_do_aggregate_serde_improve() { diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index b729d2012d..18f1e4185f 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -53,6 +53,9 @@ public: bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, bool copy_string, bool need_escape); + bool write_vec_column(const SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, + const char* data, size_t len, bool copy_string, bool need_escape); + // Removes escape characters from len characters of the null-terminated string src, // and copies the unescaped string into dest, changing *len to the unescaped length. // No null-terminator is added to dest. diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 0d6297f501..b931d4c560 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -200,6 +200,12 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, col_ptr = &nullable_column->get_nested_column(); } } + return write_vec_column(slot_desc, col_ptr, data, len, copy_string, need_escape); +} + +inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, + vectorized::IColumn* col_ptr, const char* data, + size_t len, bool copy_string, bool need_escape) { StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; // Parse the raw-text data. Translate the text string to internal format. @@ -209,6 +215,7 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, HyperLogLog(Slice(data, len))); break; } + case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { if (need_escape) { @@ -305,8 +312,7 @@ inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, if (parse_result == StringParser::PARSE_FAILURE) { if (true == slot_desc->is_nullable()) { - auto* nullable_column = - reinterpret_cast(column_ptr->get()); + auto* nullable_column = reinterpret_cast(col_ptr); size_t size = nullable_column->get_null_map_data().size(); doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data(); null_map_data[size - 1] = 1; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ad03c00b81..b32142ee80 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -206,7 +206,11 @@ set(VEC_FILES runtime/vfile_result_writer.cpp runtime/vpartition_info.cpp utils/arrow_column_to_doris_column.cpp - runtime/vsorted_run_merger.cpp) + runtime/vsorted_run_merger.cpp + exec/file_arrow_scanner.cpp + exec/file_scanner.cpp + exec/file_scan_node.cpp + exec/file_text_scanner.cpp) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp new file mode 100644 index 0000000000..57defdb8ef --- /dev/null +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_arrow_scanner.h" + +#include "exec/arrow/parquet_reader.h" +#include "io/buffered_reader.h" +#include "io/hdfs_reader_writer.h" +#include "runtime/descriptors.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris::vectorized { + +FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), + _cur_file_reader(nullptr), + _cur_file_eof(false), + _batch(nullptr), + _arrow_batch_cur_idx(0) {} + +FileArrowScanner::~FileArrowScanner() { + close(); +} + +Status FileArrowScanner::_open_next_reader() { + // open_file_reader + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } + + while (true) { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TFileRangeDesc& range = _ranges[_next_range++]; + std::unique_ptr file_reader; + + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + range.start_offset, &hdfs_reader)); + file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } + + int32_t num_of_columns_from_file = _file_slot_descs.size(); + + _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file); + + Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone()); + + if (status.is_end_of_file()) { + continue; + } else { + if (!status.ok()) { + std::stringstream ss; + ss << " file: " << range.path << " error:" << status.get_error_msg(); + return Status::InternalError(ss.str()); + } else { + return status; + } + } + } +} + +Status FileArrowScanner::open() { + RETURN_IF_ERROR(FileScanner::open()); + if (_ranges.empty()) { + return Status::OK(); + } + return Status::OK(); +} + +// get next available arrow batch +Status FileArrowScanner::_next_arrow_batch() { + _arrow_batch_cur_idx = 0; + // first, init file reader + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + } + // second, loop until find available arrow batch or EOF + while (!_scanner_eof) { + RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); + if (_cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + continue; + } + if (_batch->num_rows() == 0) { + continue; + } + return Status::OK(); + } + return Status::EndOfFile("EOF"); +} + +Status FileArrowScanner::_init_arrow_batch_if_necessary() { + // 1. init batch if first time + // 2. reset reader if end of file + Status status; + if (_scanner_eof) { + return Status::EndOfFile("EOF"); + } + if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + return _next_arrow_batch(); + } + return status; +} + +Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) { + SCOPED_TIMER(_read_timer); + // init arrow batch + { + Status st = _init_arrow_batch_if_necessary(); + if (!st.ok()) { + if (!st.is_end_of_file()) { + return st; + } + *eof = true; + return Status::OK(); + } + } + + RETURN_IF_ERROR(init_block(block)); + // convert arrow batch to block until reach the batch_size + while (!_scanner_eof) { + // cast arrow type to PT0 and append it to block + // for example: arrow::Type::INT16 => TYPE_SMALLINT + RETURN_IF_ERROR(_append_batch_to_block(block)); + // finalize the block if full + if (_rows >= _state->batch_size()) { + break; + } + auto status = _next_arrow_batch(); + // if ok, append the batch to the columns + if (status.ok()) { + continue; + } + // return error if not EOF + if (!status.is_end_of_file()) { + return status; + } + _cur_file_eof = true; + break; + } + + return finalize_block(block, eof); +} + +Status FileArrowScanner::_append_batch_to_block(Block* block) { + size_t num_elements = std::min((_state->batch_size() - block->rows()), + (_batch->num_rows() - _arrow_batch_cur_idx)); + for (auto i = 0; i < _file_slot_descs.size(); ++i) { + SlotDescriptor* slot_desc = _file_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->GetColumnByName(slot_desc->col_name()).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column( + array, _arrow_batch_cur_idx, column_with_type_and_name.column, + column_with_type_and_name.type, num_elements, _state->timezone())); + } + _rows += num_elements; + _arrow_batch_cur_idx += num_elements; + return Status::OK(); +} + +void FileArrowScanner::close() { + FileScanner::close(); + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } +} + +VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} + +ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); +} + +VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} + +ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h new file mode 100644 index 0000000000..3f0ee32082 --- /dev/null +++ b/be/src/vec/exec/file_arrow_scanner.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "util/runtime_profile.h" +#include "vec/exec/file_scanner.h" + +namespace doris::vectorized { + +// VArrow scanner convert the data read from orc|parquet to doris's columns. +class FileArrowScanner : public FileScanner { +public: + FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~FileArrowScanner() override; + + // Open this scanner, will initialize information need to + Status open() override; + + Status get_next(Block* block, bool* eof) override; + + void close() override; + +protected: + virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) = 0; + +private: + // Read next buffer from reader + Status _open_next_reader(); + Status _next_arrow_batch(); + Status _init_arrow_batch_if_necessary(); + Status _append_batch_to_block(Block* block); + +private: + // Reader + ArrowReaderWrap* _cur_file_reader; + bool _cur_file_eof; // is read over? + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx; +}; + +class VFileParquetScanner final : public FileArrowScanner { +public: + VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VFileParquetScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; +}; + +class VFileORCScanner final : public FileArrowScanner { +public: + VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VFileORCScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp new file mode 100644 index 0000000000..b01fc739e2 --- /dev/null +++ b/be/src/vec/exec/file_scan_node.cpp @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_scan_node.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/mem_tracker.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.h" +#include "runtime/tuple.h" +#include "runtime/tuple_row.h" +#include "util/runtime_profile.h" +#include "util/thread.h" +#include "util/types.h" +#include "vec/exec/file_arrow_scanner.h" +#include "vec/exec/file_text_scanner.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { + +FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _tuple_id(tnode.file_scan_node.tuple_id), + _runtime_state(nullptr), + _tuple_desc(nullptr), + _num_running_scanners(0), + _scan_finished(false), + _max_buffered_batches(32), + _wait_scanner_timer(nullptr) {} + +Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::init(tnode, state)); + auto& file_scan_node = tnode.file_scan_node; + + if (file_scan_node.__isset.pre_filter_exprs) { + _pre_filter_texprs = file_scan_node.pre_filter_exprs; + } + + return Status::OK(); +} + +Status FileScanNode::prepare(RuntimeState* state) { + VLOG_QUERY << "FileScanNode prepare"; + RETURN_IF_ERROR(ScanNode::prepare(state)); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + // get tuple desc + _runtime_state = state; + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id; + return Status::InternalError(ss.str()); + } + + // Initialize slots map + for (auto slot : _tuple_desc->slots()) { + auto pair = _slots_map.emplace(slot->col_name(), slot); + if (!pair.second) { + std::stringstream ss; + ss << "Failed to insert slot, col_name=" << slot->col_name(); + return Status::InternalError(ss.str()); + } + } + + // Profile + _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); + + return Status::OK(); +} + +Status FileScanNode::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + + RETURN_IF_ERROR(start_scanners()); + + return Status::OK(); +} + +Status FileScanNode::start_scanners() { + { + std::unique_lock l(_batch_queue_lock); + _num_running_scanners = 1; + } + _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size()); + return Status::OK(); +} + +Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + // check if CANCELLED. + if (state->is_cancelled()) { + std::unique_lock l(_batch_queue_lock); + if (update_status(Status::Cancelled("Cancelled"))) { + // Notify all scanners + _queue_writer_cond.notify_all(); + } + } + + if (_scan_finished.load()) { + *eos = true; + return Status::OK(); + } + + const int batch_size = _runtime_state->batch_size(); + while (true) { + std::shared_ptr scanner_block; + { + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && _block_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::Cancelled("Cancelled"))) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_block_queue.empty()) { + scanner_block = _block_queue.front(); + _block_queue.pop_front(); + } + } + + // All scanner has been finished, and all cached batch has been read + if (!scanner_block) { + if (_mutable_block && !_mutable_block->empty()) { + *block = _mutable_block->to_block(); + reached_limit(block, eos); + LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit."; + } + _scan_finished.store(true); + *eos = true; + return Status::OK(); + } + // notify one scanner + _queue_writer_cond.notify_one(); + + if (UNLIKELY(!_mutable_block)) { + _mutable_block.reset(new MutableBlock(scanner_block->clone_empty())); + } + + if (_mutable_block->rows() + scanner_block->rows() < batch_size) { + // merge scanner_block into _mutable_block + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + continue; + } else { + if (_mutable_block->empty()) { + // directly use scanner_block + *block = *scanner_block; + } else { + // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next. + *block = _mutable_block->to_block(); + _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns()); + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + } + break; + } + } + + reached_limit(block, eos); + if (*eos) { + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + LOG(INFO) << "FileScanNode ReachedLimit."; + } else { + *eos = false; + } + + return Status::OK(); +} + +Status FileScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + _scan_finished.store(true); + _queue_writer_cond.notify_all(); + _queue_reader_cond.notify_all(); + for (int i = 0; i < _scanner_threads.size(); ++i) { + _scanner_threads[i].join(); + } + + // Close + _batch_queue.clear(); + return ExecNode::close(state); +} + +Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) { + //create scanner object and open + std::unique_ptr scanner = create_scanner(scan_range, counter); + RETURN_IF_ERROR(scanner->open()); + bool scanner_eof = false; + while (!scanner_eof) { + RETURN_IF_CANCELLED(_runtime_state); + // If we have finished all works + if (_scan_finished.load() || !_process_status.ok()) { + return Status::OK(); + } + + std::shared_ptr block(new vectorized::Block()); + RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); + if (block->rows() == 0) { + continue; + } + auto old_rows = block->rows(); + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), + _tuple_desc->slots().size())); + counter->num_rows_unselected += old_rows - block->rows(); + if (block->rows() == 0) { + continue; + } + + std::unique_lock l(_batch_queue_lock); + while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() && + // stop pushing more batch if + // 1. too many batches in queue, or + // 2. at least one batch in queue and memory exceed limit. + (_block_queue.size() >= _max_buffered_batches || + (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) { + _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); + } + // Process already set failed, so we just return OK + if (!_process_status.ok()) { + return Status::OK(); + } + // Scan already finished, just return + if (_scan_finished.load()) { + return Status::OK(); + } + // Runtime state is canceled, just return cancel + if (_runtime_state->is_cancelled()) { + return Status::Cancelled("Cancelled"); + } + // Queue size Must be smaller than _max_buffered_batches + _block_queue.push_back(block); + + // Notify reader to + _queue_reader_cond.notify_one(); + } + return Status::OK(); +} + +void FileScanNode::scanner_worker(int start_idx, int length) { + Thread::set_self_name("file_scanner"); + Status status = Status::OK(); + ScannerCounter counter; + for (int i = 0; i < length && status.ok(); ++i) { + const TFileScanRange& scan_range = + _scan_ranges[start_idx + i].scan_range.ext_scan_range.file_scan_range; + status = scanner_scan(scan_range, &counter); + if (!status.ok()) { + LOG(WARNING) << "Scanner[" << start_idx + i + << "] process failed. status=" << status.get_error_msg(); + } + } + + // Update stats + _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); + _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); + + // scanner is going to finish + { + std::lock_guard l(_batch_queue_lock); + if (!status.ok()) { + update_status(status); + } + // This scanner will finish + _num_running_scanners--; + } + _queue_reader_cond.notify_all(); + // If one scanner failed, others don't need scan any more + if (!status.ok()) { + _queue_writer_cond.notify_all(); + } +} + +std::unique_ptr FileScanNode::create_scanner(const TFileScanRange& scan_range, + ScannerCounter* counter) { + FileScanner* scan = nullptr; + switch (scan_range.ranges[0].format_type) { + case TFileFormatType::FORMAT_PARQUET: + scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + break; + case TFileFormatType::FORMAT_ORC: + scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + break; + + default: + scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + } + std::unique_ptr scanner(scan); + return scanner; +} + +// This function is called after plan node has been prepared. +Status FileScanNode::set_scan_ranges(const std::vector& scan_ranges) { + _scan_ranges = scan_ranges; + return Status::OK(); +} + +void FileScanNode::debug_string(int ident_level, std::stringstream* out) const { + (*out) << "FileScanNode"; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h new file mode 100644 index 0000000000..5106d654cc --- /dev/null +++ b/be/src/vec/exec/file_scan_node.h @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/scan_node.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/descriptors.h" +#include "vec/exec/file_scanner.h" +namespace doris { + +class RuntimeState; +class Status; + +namespace vectorized { +class FileScanNode final : public ScanNode { +public: + FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~FileScanNode() override = default; + + // Called after create this scan node + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + // Prepare partition infos & set up timer + Status prepare(RuntimeState* state) override; + + // Start file scan using ParquetScanner or OrcScanner. + Status open(RuntimeState* state) override; + + // Fill the next row batch by calling next() on the scanner, + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { + return Status::NotSupported("Not Implemented FileScanNode::get_next."); + } + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + // Close the scanner, and report errors. + Status close(RuntimeState* state) override; + + // No use + Status set_scan_ranges(const std::vector& scan_ranges) override; + +private: + // Write debug string of this into out. + void debug_string(int indentation_level, std::stringstream* out) const override; + + // Update process status to one failed status, + // NOTE: Must hold the mutex of this scan node + bool update_status(const Status& new_status) { + if (_process_status.ok()) { + _process_status = new_status; + return true; + } + return false; + } + + std::unique_ptr create_scanner(const TFileScanRange& scan_range, + ScannerCounter* counter); + + Status start_scanners(); + + void scanner_worker(int start_idx, int length); + // Scan one range + Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter); + + TupleId _tuple_id; + RuntimeState* _runtime_state; + TupleDescriptor* _tuple_desc; + std::map _slots_map; + std::vector _scan_ranges; + + std::mutex _batch_queue_lock; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::deque> _batch_queue; + + int _num_running_scanners; + + std::atomic _scan_finished; + + Status _process_status; + + std::vector _scanner_threads; + + int _max_buffered_batches; + + // The origin preceding filter exprs. + // These exprs will be converted to expr context + // in XXXScanner. + // Because the row descriptor used for these exprs is `src_row_desc`, + // which is initialized in XXXScanner. + std::vector _pre_filter_texprs; + + RuntimeProfile::Counter* _wait_scanner_timer; + + std::deque> _block_queue; + std::unique_ptr _mutable_block; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp new file mode 100644 index 0000000000..106b94cd73 --- /dev/null +++ b/be/src/vec/exec/file_scanner.cpp @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "file_scanner.h" + +#include + +#include + +#include "common/logging.h" +#include "common/utils.h" +#include "exec/exec_node.h" +#include "exec/text_converter.hpp" +#include "exprs/expr_context.h" +#include "runtime/descriptors.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" + +namespace doris::vectorized { + +FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : _state(state), + _params(params), + _ranges(ranges), + _next_range(0), + _counter(counter), +#if BE_TEST + _mem_tracker(new MemTracker()), +#else + _mem_tracker(MemTracker::create_tracker( + -1, state->query_type() == TQueryType::LOAD + ? "FileScanner:" + std::to_string(state->load_job_id()) + : "FileScanner:Select")), +#endif + _mem_pool(std::make_unique(_mem_tracker.get())), + _pre_filter_texprs(pre_filter_texprs), + _profile(profile), + _rows_read_counter(nullptr), + _read_timer(nullptr), + _scanner_eof(false) { + _text_converter.reset(new (std::nothrow) TextConverter('\\')); +} + +Status FileScanner::open() { + RETURN_IF_ERROR(_init_expr_ctxes()); + + _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); + + return Status::OK(); +} + +Status FileScanner::_init_expr_ctxes() { + const TupleDescriptor* src_tuple_desc = + _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); + if (src_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; + return Status::InternalError(ss.str()); + } + DCHECK(!_ranges.empty()); + + std::map _full_src_index_map; + std::map _full_src_slot_map; + int index = 0; + for (const auto& slot_desc : src_tuple_desc->slots()) { + _full_src_slot_map.emplace(slot_desc->id(), slot_desc); + _full_src_index_map.emplace(slot_desc->id(), index++); + } + + _num_of_columns_from_file = _params.num_of_columns_from_file; + for (const auto& slot_info : _params.required_slots) { + auto slot_id = slot_info.slot_id; + auto it = _full_src_slot_map.find(slot_id); + if (it == std::end(_full_src_slot_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_id; + return Status::InternalError(ss.str()); + } + _required_slot_descs.emplace_back(it->second); + if (slot_info.is_file_slot) { + _file_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _file_slot_index_map.emplace(slot_id, iti->second); + } else { + _partition_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); + } + } + + _row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector({_params.src_tuple_id}), + std::vector({false}))); + + // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor + if (!_pre_filter_texprs.empty()) { + // for vectorized, preceding filter exprs should be compounded to one passed from fe. + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + } + + return Status::OK(); +} + +void FileScanner::close() { + if (_vpre_filter_ctx_ptr) { + (*_vpre_filter_ctx_ptr)->close(_state); + } +} + +Status FileScanner::init_block(vectorized::Block* block) { + (*block).clear(); + _rows = 0; + for (const auto& slot_desc : _required_slot_descs) { + if (slot_desc == nullptr) { + continue; + } + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), + is_nullable); + if (data_type == nullptr) { + return Status::NotSupported( + fmt::format("Not support type for column:{}", slot_desc->col_name())); + } + MutableColumnPtr data_column = data_type->create_column(); + (*block).insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + return Status::OK(); +} + +Status FileScanner::_filter_block(vectorized::Block* _block) { + auto origin_column_num = (*_block).columns(); + // filter block + auto old_rows = (*_block).rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, _block, + origin_column_num)); + _counter->num_rows_unselected += old_rows - (*_block).rows(); + return Status::OK(); +} + +Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) { + *eof = _scanner_eof; + RETURN_IF_ERROR(_fill_columns_from_path(_block)); + if (LIKELY(_rows > 0)) { + RETURN_IF_ERROR(_filter_block(_block)); + } + + return Status::OK(); +} + +Status FileScanner::_fill_columns_from_path(vectorized::Block* _block) { + const TFileRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { + size_t rows = _rows; + + for (const auto& slot_desc : _partition_slot_descs) { + if (slot_desc == nullptr) continue; + auto it = _partition_slot_index_map.find(slot_desc->id()); + if (it == std::end(_partition_slot_index_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); + return Status::InternalError(ss.str()); + } + const std::string& column_from_path = range.columns_from_path[it->second]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(col_ptr); + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + + for (size_t j = 0; j < rows; ++j) { + _text_converter->write_vec_column(slot_desc, col_ptr, + const_cast(column_from_path.c_str()), + column_from_path.size(), true, false); + } + } + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h new file mode 100644 index 0000000000..cb21b876d5 --- /dev/null +++ b/be/src/vec/exec/file_scanner.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/text_converter.h" +#include "exprs/expr.h" +#include "util/runtime_profile.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { +class FileScanner { +public: + FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + virtual ~FileScanner() = default; + + // Open this scanner, will initialize information need to + virtual Status open(); + + // Get next block + virtual Status get_next(vectorized::Block* block, bool* eof) { + return Status::NotSupported("Not Implemented get block"); + } + + // Close this scanner + virtual void close() = 0; + +protected: + Status finalize_block(vectorized::Block* dest_block, bool* eof); + Status init_block(vectorized::Block* block); + + std::unique_ptr _text_converter; + + RuntimeState* _state; + const TFileScanRangeParams& _params; + + const std::vector& _ranges; + int _next_range; + // used for process stat + ScannerCounter* _counter; + + // Used for constructing tuple + std::vector _required_slot_descs; + std::vector _file_slot_descs; + std::map _file_slot_index_map; + std::vector _partition_slot_descs; + std::map _partition_slot_index_map; + + std::unique_ptr _row_desc; + + std::shared_ptr _mem_tracker; + // Mem pool used to allocate _src_tuple and _src_tuple_row + std::unique_ptr _mem_pool; + + const std::vector _pre_filter_texprs; + + // Profile + RuntimeProfile* _profile; + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + + bool _scanner_eof = false; + int _rows = 0; + + std::unique_ptr _vpre_filter_ctx_ptr; + int _num_of_columns_from_file; + +private: + Status _init_expr_ctxes(); + Status _filter_block(vectorized::Block* output_block); + Status _fill_columns_from_path(vectorized::Block* output_block); +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp new file mode 100644 index 0000000000..03e2dd9275 --- /dev/null +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/file_text_scanner.h" + +#include +#include + +#include + +#include "exec/exec_node.h" +#include "exec/plain_text_line_reader.h" +#include "exec/text_converter.h" +#include "exec/text_converter.hpp" +#include "exprs/expr_context.h" +#include "io/buffered_reader.h" +#include "io/hdfs_reader_writer.h" +#include "util/types.h" +#include "util/utf8_check.h" + +namespace doris::vectorized { + +FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& ranges, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_line_reader_eof(false), + _skip_lines(0), + _success(false) + +{ + if (params.__isset.text_params) { + auto text_params = params.text_params; + if (text_params.__isset.column_separator_str) { + _value_separator = text_params.column_separator_str; + _value_separator_length = _value_separator.length(); + } + if (text_params.__isset.line_delimiter_str) { + _line_delimiter = text_params.line_delimiter_str; + _line_delimiter_length = _line_delimiter.length(); + } + } +} + +FileTextScanner::~FileTextScanner() { + close(); +} + +Status FileTextScanner::open() { + RETURN_IF_ERROR(FileScanner::open()); + + if (_ranges.empty()) { + return Status::OK(); + } + _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); + return Status::OK(); +} + +void FileTextScanner::close() { + FileScanner::close(); + + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } +} + +Status FileTextScanner::get_next(Block* block, bool* eof) { + SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(init_block(block)); + + const int batch_size = _state->batch_size(); + + while (_rows < batch_size && !_scanner_eof) { + if (_cur_line_reader == nullptr || _cur_line_reader_eof) { + RETURN_IF_ERROR(_open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + } + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); + std::unique_ptr u_ptr; + u_ptr.reset(ptr); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + { + COUNTER_UPDATE(_rows_read_counter, 1); + RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); + } + } + + return finalize_block(block, eof); +} + +Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) { + RETURN_IF_ERROR(_line_split_to_values(line)); + if (!_success) { + // If not success, which means we met an invalid row, return. + return Status::OK(); + } + + for (int i = 0; i < _split_values.size(); ++i) { + auto slot_desc = _file_slot_descs[i]; + const Slice& value = _split_values[i]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(col_ptr); + nullable_column->get_null_map_data().push_back(0); + col_ptr = &nullable_column->get_nested_column(); + } + + if (value.size == 2 && value.data[0] == '\\' && value[1] == 'N') { + col_ptr->insert_default(); + continue; + } + _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false); + } + _rows++; + return Status::OK(); +} + +Status FileTextScanner::_open_next_reader() { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_line_reader()); + _next_range++; + + return Status::OK(); +} + +Status FileTextScanner::_open_file_reader() { + const TFileRangeDesc& range = _ranges[_next_range]; + + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + range.start_offset, &hdfs_reader)); + _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + return _cur_file_reader->open(); +} + +Status FileTextScanner::_open_line_reader() { + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } + + const TFileRangeDesc& range = _ranges[_next_range]; + int64_t size = range.size; + if (range.start_offset != 0) { + if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { + std::stringstream ss; + ss << "For now we do not support split compressed file"; + return Status::InternalError(ss.str()); + } + size += 1; + // not first range will always skip one line + _skip_lines = 1; + } + + // open line reader + switch (range.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, + _line_delimiter, _line_delimiter_length); + break; + default: { + std::stringstream ss; + ss << "Unknown format type, cannot init line reader, type=" << range.format_type; + return Status::InternalError(ss.str()); + } + } + + _cur_line_reader_eof = false; + + return Status::OK(); +} + +Status FileTextScanner::_line_split_to_values(const Slice& line) { + if (!validate_utf8(line.data, line.size)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + []() -> std::string { return "Unable to display"; }, + []() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", "Unable to display"); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + _counter->num_rows_filtered++; + _success = false; + return Status::OK(); + } + + RETURN_IF_ERROR(_split_line(line)); + + _success = true; + return Status::OK(); +} + +Status FileTextScanner::_split_line(const Slice& line) { + _split_values.clear(); + std::vector tmp_split_values; + tmp_split_values.reserve(_num_of_columns_from_file); + + const char* value = line.data; + size_t start = 0; // point to the start pos of next col value. + size_t curpos = 0; // point to the start pos of separator matching sequence. + size_t p1 = 0; // point to the current pos of separator matching sequence. + size_t non_space = 0; // point to the last pos of non_space charactor. + + // Separator: AAAA + // + // p1 + // ▼ + // AAAA + // 1000AAAA2000AAAA + // ▲ ▲ + // Start │ + // curpos + + while (curpos < line.size) { + if (*(value + curpos + p1) != _value_separator[p1]) { + // Not match, move forward: + curpos += (p1 == 0 ? 1 : p1); + p1 = 0; + } else { + p1++; + if (p1 == _value_separator_length) { + // Match a separator + non_space = curpos; + // Trim tailing spaces. Be consistent with hive and trino's behavior. + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + tmp_split_values.emplace_back(value + start, non_space - start); + start = curpos + _value_separator_length; + curpos = start; + p1 = 0; + non_space = 0; + } + } + } + + CHECK(curpos == line.size) << curpos << " vs " << line.size; + non_space = curpos; + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + + tmp_split_values.emplace_back(value + start, non_space - start); + for (const auto& slot : _file_slot_descs) { + auto it = _file_slot_index_map.find(slot->id()); + if (it == std::end(_file_slot_index_map)) { + std::stringstream ss; + ss << "Unknown _file_slot_index_map, slot_id=" << slot->id(); + return Status::InternalError(ss.str()); + } + int index = it->second; + CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file; + _split_values.emplace_back(tmp_split_values[index]); + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.h b/be/src/vec/exec/file_text_scanner.h new file mode 100644 index 0000000000..fc3ba6709c --- /dev/null +++ b/be/src/vec/exec/file_text_scanner.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/consts.h" +#include "exec/base_scanner.h" +#include "exec/decompressor.h" +#include "exec/line_reader.h" +#include "exec/plain_binary_line_reader.h" +#include "exec/plain_text_line_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "io/file_factory.h" +#include "io/file_reader.h" +#include "vec/exec/file_scanner.h" + +namespace doris::vectorized { +class FileTextScanner final : public FileScanner { +public: + FileTextScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector& ranges, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~FileTextScanner() override; + + Status open() override; + + Status get_next(Block* block, bool* eof) override; + void close() override; + +private: + Status _fill_file_columns(const Slice& line, vectorized::Block* _block); + Status _open_next_reader(); + Status _open_file_reader(); + Status _open_line_reader(); + Status _line_split_to_values(const Slice& line); + Status _split_line(const Slice& line); + // Reader + std::shared_ptr _cur_file_reader; + LineReader* _cur_line_reader; + bool _cur_line_reader_eof; + + // When we fetch range start from 0, header_type="csv_with_names" skip first line + // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line + // When we fetch range doesn't start + int _skip_lines; + std::vector _split_values; + std::string _value_separator; + std::string _line_delimiter; + int _value_separator_length; + int _line_delimiter_length; + + bool _success; +}; +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index ed7ba00dd6..55b66593e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -20,7 +20,7 @@ package org.apache.doris.analysis; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.IdGenerator; import org.apache.doris.thrift.TDescriptorTable; @@ -46,7 +46,7 @@ public class DescriptorTable { private final HashMap tupleDescs = new HashMap(); // List of referenced tables with no associated TupleDescriptor to ship to the BE. // For example, the output table of an insert query. - private final List referencedTables = new ArrayList
(); + private final List referencedTables = new ArrayList(); private final IdGenerator tupleIdGenerator = TupleId.createGenerator(); private final IdGenerator slotIdGenerator = SlotId.createGenerator(); private final HashMap slotDescs = Maps.newHashMap(); @@ -121,7 +121,7 @@ public class DescriptorTable { return tupleDescs.values(); } - public void addReferencedTable(Table table) { + public void addReferencedTable(TableIf table) { referencedTables.add(table); } @@ -151,7 +151,7 @@ public class DescriptorTable { public TDescriptorTable toThrift() { TDescriptorTable result = new TDescriptorTable(); - HashSet
referencedTbls = Sets.newHashSet(); + HashSet referencedTbls = Sets.newHashSet(); for (TupleDescriptor tupleD : tupleDescs.values()) { // inline view of a non-constant select has a non-materialized tuple descriptor // in the descriptor table just for type checking, which we need to skip @@ -161,7 +161,7 @@ public class DescriptorTable { // but its table has no id if (tupleD.getTable() != null && tupleD.getTable().getId() >= 0) { - referencedTbls.add((Table) tupleD.getTable()); + referencedTbls.add(tupleD.getTable()); } for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) { result.addToSlotDescriptors(slotD.toThrift()); @@ -169,11 +169,9 @@ public class DescriptorTable { } } - for (Table table : referencedTables) { - referencedTbls.add(table); - } + referencedTbls.addAll(referencedTables); - for (Table tbl : referencedTbls) { + for (TableIf tbl : referencedTbls) { result.addToTableDescriptors(tbl.toThrift()); } return result; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index 0bb58a4778..2d97c54c41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -22,7 +22,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.TableAliasGenerator; @@ -1186,7 +1186,7 @@ public class StmtRewriter { } continue; } - Table table = tableRef.getTable(); + TableIf table = tableRef.getTable(); String dbName = tableRef.getName().getDb(); if (dbName == null) { dbName = analyzer.getDefaultDb(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 107a9a3637..01c816a920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -21,7 +21,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -300,8 +300,8 @@ public class TableRef implements ParseNode, Writable { return !correlatedTupleIds.isEmpty(); } - public Table getTable() { - return (Table) desc.getTable(); + public TableIf getTable() { + return desc.getTable(); } public void setUsingClause(List colNames) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java index 96aaa615cc..b75bb10af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java @@ -22,7 +22,6 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.thrift.TTupleDescriptor; @@ -353,7 +352,7 @@ public class TupleDescriptor { if (slotDescriptor.getColumn() != null) { TupleDescriptor parent = slotDescriptor.getParent(); Preconditions.checkState(parent != null); - Table table = (Table) parent.getTable(); + TableIf table = parent.getTable(); Preconditions.checkState(table != null); Long tableId = table.getId(); Set columnNames = tableIdToColumnNames.get(tableId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 46f1ea0b63..80d311fe54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -698,6 +698,10 @@ public class HiveMetaStoreClientHelper { return Type.DATE; case "timestamp": return Type.DATETIME; + case "float": + return Type.FLOAT; + case "double": + return Type.DOUBLE; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d2f1427e5c..b0066b28aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.TTableDescriptor; import java.util.List; import java.util.concurrent.TimeUnit; @@ -92,11 +93,14 @@ public interface TableIf { String getComment(boolean escapeQuota); + public TTableDescriptor toThrift(); + /** * Doris table type. */ public enum TableType { - MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, TABLE_VALUED_FUNCTION; + MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, + TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE; public String toEngineName() { switch (this) { @@ -122,6 +126,8 @@ public interface TableIf { return "Hudi"; case TABLE_VALUED_FUNCTION: return "Table_Valued_Function"; + case HMS_EXTERNAL_TABLE: + return "hms"; default: return null; } @@ -143,6 +149,7 @@ public interface TableIf { case HIVE: case HUDI: case TABLE_VALUED_FUNCTION: + case HMS_EXTERNAL_TABLE: return "EXTERNAL TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index a834e5508e..f43bd8edf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.TTableDescriptor; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; @@ -249,5 +250,10 @@ public class ExternalTable implements TableIf { @Override public String getComment(boolean escapeQuota) { return ""; + + } + + public TTableDescriptor toThrift() { + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 9c6e9b12fe..fd909ac514 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -21,11 +21,16 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.thrift.THiveTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -38,6 +43,13 @@ public class HMSExternalTable extends ExternalTable { private final String metastoreUri; private final String dbName; private org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + private DLAType dlaType = null; + + public enum DLAType { + HIVE, + HUDI, + ICEBERG + } /** * Create hive metastore external table. @@ -51,6 +63,7 @@ public class HMSExternalTable extends ExternalTable { super(id, name); this.dbName = dbName; this.metastoreUri = uri; + this.type = TableType.HMS_EXTERNAL_TABLE; init(); } @@ -58,11 +71,11 @@ public class HMSExternalTable extends ExternalTable { getRemoteTable(); if (remoteTable.getParameters().containsKey("table_type") && remoteTable.getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) { - type = TableType.ICEBERG; + dlaType = DLAType.ICEBERG; } else if (remoteTable.getSd().getInputFormat().toLowerCase().contains("hoodie")) { - type = TableType.HUDI; + dlaType = DLAType.HUDI; } else { - type = TableType.HIVE; + dlaType = DLAType.HIVE; } } @@ -92,6 +105,7 @@ public class HMSExternalTable extends ExternalTable { if (fullSchema == null) { synchronized (this) { if (fullSchema == null) { + fullSchema = new ArrayList<>(); try { for (FieldSchema field : HiveMetaStoreClientHelper.getSchema(dbName, name, metastoreUri)) { fullSchema.add(new Column(field.getName(), @@ -191,4 +205,24 @@ public class HMSExternalTable extends ExternalTable { public String getDbName() { return dbName; } + + /** + * get the dla type for scan node to get right information. + */ + public DLAType getDlaType() { + return dlaType; + } + + @Override + public TTableDescriptor toThrift() { + THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + fullSchema.size(), 0, getName(), ""); + tTableDescriptor.setHiveTable(tHiveTable); + return tTableDescriptor; + } + + public String getMetastoreUri() { + return metastoreUri; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 60dd5a8b33..8367bc8b85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -49,7 +49,6 @@ import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; -import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TBrokerReadResponse; import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; @@ -91,25 +90,26 @@ public class BrokerUtil { public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; - public static void generateHdfsParam(Map properties, TBrokerRangeDesc rangeDesc) { - rangeDesc.setHdfsParams(new THdfsParams()); - rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>()); + public static THdfsParams generateHdfsParam(Map properties) { + THdfsParams tHdfsParams = new THdfsParams(); + tHdfsParams.setHdfsConf(new ArrayList<>()); for (Map.Entry property : properties.entrySet()) { if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { - rangeDesc.hdfs_params.setFsName(property.getValue()); + tHdfsParams.setFsName(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) { - rangeDesc.hdfs_params.setUser(property.getValue()); + tHdfsParams.setUser(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) { - rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue()); + tHdfsParams.setHdfsKerberosPrincipal(property.getValue()); } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) { - rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue()); + tHdfsParams.setHdfsKerberosKeytab(property.getValue()); } else { THdfsConf hdfsConf = new THdfsConf(); hdfsConf.setKey(property.getKey()); hdfsConf.setValue(property.getValue()); - rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf); + tHdfsParams.hdfs_conf.add(hdfsConf); } } + return tHdfsParams; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java index 192d25b6a4..116efe65e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java @@ -117,6 +117,10 @@ public class DataSourceMgr implements Writable { return dbNames; } + public DataSourceIf getExternalDatasource(String name) { + return nameToCatalogs.get(name); + } + private void writeLock() { lock.writeLock().lock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java index b7097ff4cf..4d1a84c4b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java @@ -48,8 +48,9 @@ public class HMSExternalDataSource extends ExternalDataSource { //Cache of db name to db id. private ConcurrentHashMap dbNameToId = new ConcurrentHashMap(); - private AtomicLong nextId = new AtomicLong(0); + private static final AtomicLong nextId = new AtomicLong(0); + private boolean initialized = false; protected String hiveMetastoreUris; protected HiveMetaStoreClient client; @@ -57,34 +58,21 @@ public class HMSExternalDataSource extends ExternalDataSource { * Default constructor for HMSExternalDataSource. */ public HMSExternalDataSource(String name, Map props) { - setName(name); - getDsProperty().setProperties(props); - setType("hms"); - } - - /** - * Hive metastore data source implementation. - * - * @param hiveMetastoreUris e.g. thrift://127.0.0.1:9083 - */ - public HMSExternalDataSource(long id, String name, String type, DataSourceProperty dsProperty, - String hiveMetastoreUris) throws DdlException { - this.id = id; + this.id = nextId.incrementAndGet(); this.name = name; - this.type = type; - this.dsProperty = dsProperty; - this.hiveMetastoreUris = hiveMetastoreUris; - init(); + this.type = "hms"; + this.dsProperty = new DataSourceProperty(); + this.dsProperty.setProperties(props); + this.hiveMetastoreUris = props.getOrDefault("hive.metastore.uris", "thrift://127.0.0.1:9083"); } - private void init() throws DdlException { + private void init() { HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreUris); try { client = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage()); - throw new DdlException("Create HMSExternalDataSource failed.", e); } List allDatabases; try { @@ -102,8 +90,20 @@ public class HMSExternalDataSource extends ExternalDataSource { } } + /** + * Datasource can't be init when creating because the external datasource may depend on third system. + * So you have to make sure the client of third system is initialized before any method was called. + */ + private synchronized void makeSureInitialized() { + if (!initialized) { + init(); + initialized = true; + } + } + @Override public List listDatabaseNames(SessionContext ctx) { + makeSureInitialized(); try { List allDatabases = client.getAllDatabases(); // Update the db name to id map. @@ -119,6 +119,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public List listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); try { return client.getAllTables(dbName); } catch (MetaException e) { @@ -129,6 +130,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); try { return client.tableExists(dbName, tblName); } catch (TException e) { @@ -140,6 +142,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Nullable @Override public ExternalDatabase getDbNullable(String dbName) { + makeSureInitialized(); try { client.getDatabase(dbName); } catch (TException e) { @@ -156,6 +159,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Nullable @Override public ExternalDatabase getDbNullable(long dbId) { + makeSureInitialized(); for (Map.Entry entry : dbNameToId.entrySet()) { if (entry.getValue() == dbId) { return new HMSExternalDatabase(this, dbId, entry.getKey(), hiveMetastoreUris); @@ -230,6 +234,7 @@ public class HMSExternalDataSource extends ExternalDataSource { @Override public List getDbIds() { + makeSureInitialized(); return Lists.newArrayList(dbNameToId.values()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 16f86de29f..a9a640e38d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -605,7 +605,8 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setHeaderType(headerType); // set hdfs params for hdfs file type. if (brokerDesc.getFileType() == TFileType.FILE_HDFS) { - BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties()); + rangeDesc.setHdfsParams(tHdfsParams); } return rangeDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index 909f5cbe6d..cb57f1ec7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -292,11 +292,9 @@ public class HudiScanNode extends BrokerScanNode { return; } - THdfsParams hdfsParams = new THdfsParams(); String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); - hdfsParams.setFsName(fsName); Log.debug("Hudi path's host is " + fsName); TFileFormatType fileFormatType = null; @@ -319,7 +317,7 @@ public class HudiScanNode extends BrokerScanNode { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, fileFormatType, partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc); - rangeDesc.setHdfsParams(hdfsParams); + rangeDesc.getHdfsParams().setFsName(fsName); rangeDesc.setReadByColumnDef(true); curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); @@ -350,7 +348,8 @@ public class HudiScanNode extends BrokerScanNode { // set hdfs params for hdfs file type. switch (brokerDesc.getFileType()) { case FILE_HDFS: - BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties()); + rangeDesc.setHdfsParams(tHdfsParams); break; default: break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 3e9a3ae137..748d33a0b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -63,6 +63,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.ExternalFileScanNode; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -1726,6 +1727,9 @@ public class SingleNodePlanner { scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction()); break; + case HMS_EXTERNAL_TABLE: + scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HMS_FILE_SCAN_NODE"); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 0c6e66e2d2..7b77dd457f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -19,17 +19,11 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -42,12 +36,15 @@ import org.apache.doris.resource.Tag; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; -import org.apache.doris.thrift.TBrokerRangeDesc; -import org.apache.doris.thrift.TBrokerScanNode; -import org.apache.doris.thrift.TBrokerScanRange; -import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanNode; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; @@ -61,7 +58,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; @@ -69,7 +65,6 @@ import org.apache.logging.log4j.Logger; import org.mortbay.log.Log; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -88,9 +83,8 @@ public class ExternalFileScanNode extends ExternalScanNode { private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; private static class ParamCreateContext { - public TBrokerScanRangeParams params; + public TFileScanRangeParams params; public TupleDescriptor srcTupleDescriptor; - public Map slotDescByName; } private static class BackendPolicy { @@ -135,16 +129,12 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - private enum DLAType { - HIVE, - HUDI, - ICE_BERG - } - private final BackendPolicy backendPolicy = new BackendPolicy(); private final ParamCreateContext context = new ParamCreateContext(); + private final List partitionKeys = new ArrayList<>(); + private List scanRangeLocations; private final HMSExternalTable hmsTable; @@ -157,17 +147,17 @@ public class ExternalFileScanNode extends ExternalScanNode { public ExternalFileScanNode( PlanNodeId id, TupleDescriptor desc, - String planNodeName) throws MetaNotFoundException { - super(id, desc, planNodeName, StatisticalType.BROKER_SCAN_NODE); + String planNodeName) { - this.hmsTable = (HMSExternalTable) desc.getTable(); + super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE); - DLAType type = getDLAType(); - switch (type) { + this.hmsTable = (HMSExternalTable) this.desc.getTable(); + + switch (this.hmsTable.getDlaType()) { case HUDI: this.scanProvider = new ExternalHudiScanProvider(this.hmsTable); break; - case ICE_BERG: + case ICEBERG: this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable); break; case HIVE: @@ -178,61 +168,63 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - private DLAType getDLAType() throws MetaNotFoundException { - if (hmsTable.getRemoteTable().getParameters().containsKey("table_type") - && hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) { - return DLAType.ICE_BERG; - } else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) { - return DLAType.HUDI; - } else { - return DLAType.HIVE; - } - } - @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); backendPolicy.init(); - initContext(context); + initContext(); } - private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException { + private void initContext() throws DdlException, MetaNotFoundException { context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); - context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - context.params = new TBrokerScanRangeParams(); + context.params = new TFileScanRangeParams(); if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { Map serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim"); String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim"); - context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]); - context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]); - context.params.setColumnSeparatorStr(columnSeparator); - context.params.setLineDelimiterStr(lineDelimiter); - context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length); - context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length); + + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setLineDelimiterStr(lineDelimiter); + textParams.setColumnSeparatorStr(columnSeparator); + + context.params.setTextParams(textParams); } - Map slotDescByName = Maps.newHashMap(); + context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); + // Need re compute memory layout after set some slot descriptor to nullable + context.srcTupleDescriptor.computeStatAndMemLayout(); + + Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); List columns = hmsTable.getBaseSchema(false); for (Column column : columns) { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setType(column.getType()); slotDesc.setIsMaterialized(true); slotDesc.setIsNullable(true); - slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); - context.params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDesc.setColumn(new Column(column)); slotDescByName.put(column.getName(), slotDesc); } - context.slotDescByName = slotDescByName; + + // Hive table must extract partition value from path and hudi/iceberg table keep partition field in file. + partitionKeys.addAll(scanProvider.getPathPartitionKeys()); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slotId); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + + context.params.addToRequiredSlots(slotInfo); + } } @Override public void finalize(Analyzer analyzer) throws UserException { try { - finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor); buildScanRange(); } catch (IOException e) { LOG.error("Finalize failed.", e); @@ -248,50 +240,40 @@ public class ExternalFileScanNode extends ExternalScanNode { return; } - THdfsParams hdfsParams = new THdfsParams(); String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); - hdfsParams.setFsName(fsName); - List partitionKeys = new ArrayList<>(); - for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) { - partitionKeys.add(fieldSchema.getName()); - } + // Todo: now every split will assign one scan range, we can merge them for optimize. for (InputSplit split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TScanRangeLocations curLocations = newLocations(context.params); List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), partitionKeys); - int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath, - numberOfColumnsFromFile); - rangeDesc.setHdfsParams(hdfsParams); - rangeDesc.setReadByColumnDef(true); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); + rangeDesc.getHdfsParams().setFsName(fsName); - curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); - // Put the last file - if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { - scanRangeLocations.add(curLocations); - } + scanRangeLocations.add(curLocations); } } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params) { - // Generate on broker scan range - TBrokerScanRange brokerScanRange = new TBrokerScanRange(); - brokerScanRange.setParams(params); - brokerScanRange.setBrokerAddresses(new ArrayList<>()); + private TScanRangeLocations newLocations(TFileScanRangeParams params) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); TScanRange scanRange = new TScanRange(); - scanRange.setBrokerScanRange(brokerScanRange); + scanRange.setExtScanRange(externalScanRange); // Locations TScanRangeLocations locations = new TScanRangeLocations(); @@ -306,66 +288,24 @@ public class ExternalFileScanNode extends ExternalScanNode { return locations; } - private TBrokerRangeDesc createBrokerRangeDesc( + private TFileRangeDesc createFileRangeDesc( FileSplit fileSplit, - List columnsFromPath, - int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException { - TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); + List columnsFromPath) throws DdlException, MetaNotFoundException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setFileType(scanProvider.getTableFileType()); rangeDesc.setFormatType(scanProvider.getTableFormatType()); rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - rangeDesc.setSplittable(true); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); - rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); rangeDesc.setColumnsFromPath(columnsFromPath); // set hdfs params for hdfs file type. if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc); + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); + rangeDesc.setHdfsParams(tHdfsParams); } return rangeDesc; } - private void finalizeParams( - Map slotDescByName, - TBrokerScanRangeParams params, - TupleDescriptor srcTupleDesc) throws UserException { - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - for (SlotDescriptor destSlotDesc : desc.getSlots()) { - Expr expr; - SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); - if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); - // If dest is allow null, we set source to nullable - if (destSlotDesc.getColumn().isAllowNull()) { - srcSlotDesc.setIsNullable(true); - } - expr = new SlotRef(srcSlotDesc); - } else { - Column column = destSlotDesc.getColumn(); - if (column.getDefaultValue() != null) { - expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - throw new AnalysisException("column has no source field, column=" + column.getName()); - } - } - } - - expr = castToSlot(destSlotDesc, expr); - params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); - } - params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); - params.setDestTupleId(desc.getId().asInt()); - params.setStrictMode(false); - params.setSrcTupleId(srcTupleDesc.getId().asInt()); - - // Need re compute memory layout after set some slot descriptor to nullable - srcTupleDesc.computeStatAndMemLayout(); - } - @Override public int getNumInstances() { return scanRangeLocations.size(); @@ -373,18 +313,19 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override protected void toThrift(TPlanNode planNode) { - planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE); - TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); + planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); + TFileScanNode fileScanNode = new TFileScanNode(); + fileScanNode.setTupleId(desc.getId().asInt()); if (!preFilterConjuncts.isEmpty()) { if (Config.enable_vectorized_load && vpreFilterConjunct != null) { - brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); + fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); } else { for (Expr e : preFilterConjuncts) { - brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + fileScanNode.addToPreFilterExprs(e.treeToThrift()); } } } - planNode.setBrokerScanNode(brokerScanNode); + planNode.setFileScanNode(fileScanNode); } @Override @@ -394,15 +335,8 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - String url; - try { - url = scanProvider.getMetaStoreUrl(); - } catch (MetaNotFoundException e) { - LOG.warn("Can't get url error", e); - url = "Can't get url error."; - } return prefix + "DATABASE: " + hmsTable.getDbName() + "\n" + prefix + "TABLE: " + hmsTable.getName() + "\n" - + prefix + "HIVE URL: " + url + "\n"; + + prefix + "HIVE URL: " + scanProvider.getMetaStoreUrl() + "\n"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java index 2fc626ab70..ebdd7a768a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java @@ -39,11 +39,13 @@ public interface ExternalFileScanProvider { TFileType getTableFileType(); - String getMetaStoreUrl() throws MetaNotFoundException; + String getMetaStoreUrl(); InputSplit[] getSplits(List exprs) throws IOException, UserException; Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; Map getTableProperties() throws MetaNotFoundException; + + List getPathPartitionKeys() throws DdlException, MetaNotFoundException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java index 979a5a29d3..626870d43e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -28,7 +28,6 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -77,8 +76,8 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { } @Override - public String getMetaStoreUrl() throws MetaNotFoundException { - return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name()); + public String getMetaStoreUrl() { + return hmsTable.getMetastoreUri(); } @Override @@ -146,4 +145,9 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { public Map getTableProperties() throws MetaNotFoundException { return hmsTable.getRemoteTable().getParameters(); } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java index 2951c1fb7a..1638d9429c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java @@ -19,8 +19,12 @@ package org.apache.doris.planner.external; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.thrift.TFileFormatType; +import java.util.Collections; +import java.util.List; + /** * A file scan provider for hudi. * HudiProvier is extended with hive since they both use input format interface to get the spilt. @@ -35,4 +39,9 @@ public class ExternalHudiScanProvider extends ExternalHiveScanProvider { public TFileFormatType getTableFormatType() throws DdlException { return TFileFormatType.FORMAT_PARQUET; } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return Collections.emptyList(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java index 28ba3dad0a..6f321fc4b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java @@ -18,16 +18,15 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.IcebergProperty; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.external.iceberg.HiveCatalog; import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -39,7 +38,10 @@ import org.apache.iceberg.expressions.Expression; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * A file scan provider for iceberg. @@ -98,8 +100,20 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { } private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { - HiveCatalog hiveCatalog = new HiveCatalog(); - hiveCatalog.initialize(new IcebergProperty(getTableProperties())); + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = new Configuration(); + hiveCatalog.setConf(conf); + // initialize hive catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("hive.metastore.uris", getMetaStoreUrl()); + catalogProperties.put("uri", getMetaStoreUrl()); + hiveCatalog.initialize("hive", catalogProperties); + return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName())); } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return Collections.emptyList(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index cc806f7b3a..4b23337038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -45,4 +45,5 @@ public enum StatisticalType { TABLE_FUNCTION_NODE, UNION_NODE, TABLE_VALUED_FUNCTION_NODE, + FILE_SCAN_NODE, } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4dcddcfd1e..1b627f4879 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -22,6 +22,7 @@ include "Exprs.thrift" include "Types.thrift" include "Opcodes.thrift" include "Partitions.thrift" +include "Descriptors.thrift" enum TPlanNodeType { OLAP_SCAN_NODE, @@ -53,6 +54,7 @@ enum TPlanNodeType { ODBC_SCAN_NODE, TABLE_FUNCTION_NODE, TABLE_VALUED_FUNCTION_SCAN_NODE, + FILE_SCAN_NODE, } // phases of an execution node @@ -213,8 +215,46 @@ struct TEsScanRange { 4: required i32 shard_id } -struct TFileScanRange { +struct TFileTextScanRangeParams { + 1: optional string column_separator_str; + 2: optional string line_delimiter_str; +} +struct TFileScanSlotInfo { + 1: optional Types.TSlotId slot_id; + 2: optional bool is_file_slot; +} + +struct TFileScanRangeParams { + // use src_tuple_id to get all slots from src table include both file slot and partition slot. + 1: optional Types.TTupleId src_tuple_id; + // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot + 2: optional i32 num_of_columns_from_file; + // all selected slots which may compose from file and partiton value. + 3: optional list required_slots; + + 4: optional TFileTextScanRangeParams text_params; +} + +struct TFileRangeDesc { + 1: optional Types.TFileType file_type; + 2: optional TFileFormatType format_type; + // Path of this range + 3: optional string path; + // Offset of this file start + 4: optional i64 start_offset; + // Size of this range, if size = -1, this means that will read to the end of file + 5: optional i64 size; + // columns parsed from file path should be after the columns read from file + 6: optional list columns_from_path; + + 7: optional THdfsParams hdfs_params; +} + +// HDFS file scan range +struct TFileScanRange { + 1: optional list ranges + 2: optional TFileScanRangeParams params } // Scan range for external datasource, such as file on hdfs, es datanode, etc. @@ -281,6 +321,11 @@ struct TBrokerScanNode { 4: optional list pre_filter_exprs } +struct TFileScanNode { + 1: optional Types.TTupleId tuple_id + 2: optional list pre_filter_exprs +} + struct TEsScanNode { 1: required Types.TTupleId tuple_id 2: optional map properties @@ -830,6 +875,9 @@ struct TPlanNode { // output column 42: optional list output_slot_ids 43: optional TTableValuedFunctionScanNode table_valued_func_scan_node + + // file scan node + 44: optional TFileScanNode file_scan_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first