diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 9a2b335728..7f6f07d1c2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -61,6 +61,7 @@ #include "vec/core/block.h" #include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" +#include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" @@ -584,7 +585,12 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::FILE_SCAN_NODE: - *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + // *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + if (config::enable_new_scan_node) { + *node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + } return Status::OK(); @@ -704,7 +710,8 @@ void ExecNode::try_do_aggregate_serde_improve() { // TODO(cmy): should be removed when NewOlapScanNode is ready ExecNode* child0 = agg_node[0]->_children[0]; - if (typeid(*child0) == typeid(vectorized::NewOlapScanNode)) { + if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) || + typeid(*child0) == typeid(vectorized::NewFileScanNode)) { vectorized::VScanNode* scan_node = static_cast(agg_node[0]->_children[0]); scan_node->set_no_agg_finalize(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 6274b47174..aab0b44379 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -46,6 +46,7 @@ #include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/core/block.h" +#include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vexchange_node.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -166,7 +167,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, for (int i = 0; i < scan_nodes.size(); ++i) { // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. ExecNode* node = scan_nodes[i]; - if (typeid(*node) == typeid(vectorized::NewOlapScanNode)) { + if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || + typeid(*node) == typeid(vectorized::NewFileScanNode)) { vectorized::VScanNode* scan_node = static_cast(scan_nodes[i]); const std::vector& scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index f1fc4b4ec2..23f8c08fad 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -239,6 +239,10 @@ set(VEC_FILES exec/scan/scanner_scheduler.cpp exec/scan/new_olap_scan_node.cpp exec/scan/new_olap_scanner.cpp + exec/scan/new_file_arrow_scanner.cpp + exec/scan/new_file_scan_node.cpp + exec/scan/new_file_scanner.cpp + exec/scan/new_file_text_scanner.cpp ) add_library(Vec STATIC diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp new file mode 100644 index 0000000000..acc98194d7 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_file_arrow_scanner.h" + +#include "exec/arrow/orc_reader.h" +#include "exec/arrow/parquet_reader.h" +#include "io/file_factory.h" +#include "vec/exec/scan/vscan_node.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris::vectorized { + +NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, + int64_t limit, const TFileScanRange& scan_range, + MemTracker* tracker, RuntimeProfile* profile) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + _cur_file_reader(nullptr), + _cur_file_eof(false), + _batch(nullptr), + _arrow_batch_cur_idx(0) {} + +Status NewFileArrowScanner::open(RuntimeState* state) { + RETURN_IF_ERROR(NewFileScanner::open(state)); + // SCOPED_TIMER(_parent->_reader_init_timer); + // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); + return Status::OK(); +} + +Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + // init arrow batch + { + Status st = _init_arrow_batch_if_necessary(); + if (!st.ok()) { + if (!st.is_end_of_file()) { + return st; + } + *eof = true; + return Status::OK(); + } + } + + *eof = false; + RETURN_IF_ERROR(init_block(block)); + // convert arrow batch to block until reach the batch_size + while (!_scanner_eof) { + // cast arrow type to PT0 and append it to block + // for example: arrow::Type::INT16 => TYPE_SMALLINT + RETURN_IF_ERROR(_append_batch_to_block(block)); + // finalize the block if full + if (_rows >= _state->batch_size()) { + break; + } + auto status = _next_arrow_batch(); + // if ok, append the batch to the columns + if (status.ok()) { + continue; + } + // return error if not EOF + if (!status.is_end_of_file()) { + return status; + } + _cur_file_eof = true; + break; + } + + if (_scanner_eof && block->rows() == 0) { + *eof = true; + } + // return finalize_block(block, eof); + return Status::OK(); +} + +Status NewFileArrowScanner::_init_arrow_batch_if_necessary() { + // 1. init batch if first time + // 2. reset reader if end of file + Status status = Status::OK(); + if (_scanner_eof) { + return Status::EndOfFile("EOF"); + } + if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + return _next_arrow_batch(); + } + return status; +} + +Status NewFileArrowScanner::_append_batch_to_block(Block* block) { + size_t num_elements = std::min((_state->batch_size() - block->rows()), + (_batch->num_rows() - _arrow_batch_cur_idx)); + for (auto i = 0; i < _file_slot_descs.size(); ++i) { + SlotDescriptor* slot_desc = _file_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + std::string real_column_name = _cur_file_reader->is_case_sensitive() + ? slot_desc->col_name() + : slot_desc->col_name_lower_case(); + auto* array = _batch->GetColumnByName(real_column_name).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column( + array, _arrow_batch_cur_idx, column_with_type_and_name.column, + column_with_type_and_name.type, num_elements, _state->timezone_obj())); + } + _rows += num_elements; + _arrow_batch_cur_idx += num_elements; + return _fill_columns_from_path(block, num_elements); +} + +Status NewFileArrowScanner::_next_arrow_batch() { + _arrow_batch_cur_idx = 0; + // first, init file reader + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + } + // second, loop until find available arrow batch or EOF + while (!_scanner_eof) { + RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); + if (_cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + continue; + } + if (_batch->num_rows() == 0) { + continue; + } + return Status::OK(); + } + return Status::EndOfFile("EOF"); +} + +Status NewFileArrowScanner::_open_next_reader() { + // open_file_reader + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } + + while (true) { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TFileRangeDesc& range = _ranges[_next_range++]; + std::unique_ptr file_reader; + + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, + range, file_reader)); + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } + + int32_t num_of_columns_from_file = _file_slot_descs.size(); + + _cur_file_reader = + _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file, range.start_offset, range.size); + + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id()); + // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty. + Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, + _state->timezone()); + if (status.is_end_of_file()) { + continue; + } else { + if (!status.ok()) { + std::stringstream ss; + ss << " file: " << range.path << " error:" << status.get_error_msg(); + return Status::InternalError(ss.str()); + } else { + // _update_profile(_cur_file_reader->statistics()); + return status; + } + } + } +} + +NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, + int64_t limit, const TFileScanRange& scan_range, + MemTracker* tracker, RuntimeProfile* profile) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) { + // _init_profiles(profile); +} + +ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* file_reader, + int64_t batch_size, + int32_t num_of_columns_from_file, + int64_t range_start_offset, + int64_t range_size) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, + range_start_offset, range_size, false); +} + +NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) {} + +ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file, + int64_t range_start_offset, + int64_t range_size) { + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, + range_size, false); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h new file mode 100644 index 0000000000..3a85ce65c5 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "exprs/bloomfilter_predicate.h" +#include "exprs/function_filter.h" +#include "vec/exec/scan/new_file_scanner.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { +class NewFileArrowScanner : public NewFileScanner { +public: + NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile); + Status open(RuntimeState* state) override; + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) = 0; + +private: + Status _open_next_reader(); + Status _next_arrow_batch(); + Status _init_arrow_batch_if_necessary(); + Status _append_batch_to_block(Block* block); + +private: + // Reader + ArrowReaderWrap* _cur_file_reader; + bool _cur_file_eof; // is read over? + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx; +}; + +class NewFileParquetScanner final : public NewFileArrowScanner { +public: + NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile); + + ~NewFileParquetScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; + + void _init_profiles(RuntimeProfile* profile) override {}; +}; + +class NewFileORCScanner final : public NewFileArrowScanner { +public: + NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile); + + ~NewFileORCScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; + void _init_profiles(RuntimeProfile* profile) override {}; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp new file mode 100644 index 0000000000..c978ef0867 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_file_scan_node.h" + +#include "vec/columns/column_const.h" +#include "vec/exec/scan/new_file_arrow_scanner.h" +#include "vec/exec/scan/new_file_text_scanner.h" +#include "vec/exec/scan/new_olap_scanner.h" +#include "vec/functions/in.h" + +namespace doris::vectorized { + +NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), _file_scan_node(tnode.file_scan_node) { + _output_tuple_id = tnode.file_scan_node.tuple_id; +} + +Status NewFileScanNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::prepare(state)); + _scanner_mem_tracker = std::make_unique("NewFileScanners"); + return Status::OK(); +} + +void NewFileScanNode::set_scan_ranges(const std::vector& scan_ranges) { + int max_scanners = config::doris_scanner_thread_pool_thread_num; + if (scan_ranges.size() <= max_scanners) { + _scan_ranges = scan_ranges; + } else { + // There is no need for the number of scanners to exceed the number of threads in thread pool. + _scan_ranges.clear(); + auto range_iter = scan_ranges.begin(); + for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { + _scan_ranges.push_back(*range_iter); + } + for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { + if (i == max_scanners) { + i = 0; + } + auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; + auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + _scan_ranges.shrink_to_fit(); + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); + } +} + +Status NewFileScanNode::_init_profile() { + VScanNode::_init_profile(); + return Status::OK(); +} + +Status NewFileScanNode::_process_conjuncts() { + RETURN_IF_ERROR(VScanNode::_process_conjuncts()); + if (_eos) { + return Status::OK(); + } + // TODO: Push conjuncts down to reader. + return Status::OK(); +} + +Status NewFileScanNode::_init_scanners(std::list* scanners) { + if (_scan_ranges.empty()) { + _eos = true; + return Status::OK(); + } + + for (auto& scan_range : _scan_ranges) { + VScanner* scanner = + (VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range); + scanners->push_back(scanner); + } + + return Status::OK(); +} + +VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { + NewFileScanner* scanner = nullptr; + switch (scan_range.params.format_type) { + case TFileFormatType::FORMAT_PARQUET: + scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, + _scanner_mem_tracker.get(), runtime_profile()); + break; + case TFileFormatType::FORMAT_ORC: + scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, + _scanner_mem_tracker.get(), runtime_profile()); + break; + + default: + scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, + _scanner_mem_tracker.get(), runtime_profile()); + break; + } + _scanner_pool.add(scanner); + scanner->prepare(_vconjunct_ctx_ptr.get()); + // TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead? + scanner->reg_conjunct_ctxs(_conjunct_ctxs); + return scanner; +} + +}; // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h new file mode 100644 index 0000000000..c523bbb737 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +class NewFileScanNode : public VScanNode { +public: + NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + Status prepare(RuntimeState* state) override; + + void set_scan_ranges(const std::vector& scan_ranges) override; + +protected: + Status _init_profile() override; + Status _process_conjuncts() override; + Status _init_scanners(std::list* scanners) override; + +private: + VScanner* _create_scanner(const TFileScanRange& scan_range); + +private: + std::vector _scan_ranges; + TFileScanNode _file_scan_node; + std::unique_ptr _scanner_mem_tracker; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp new file mode 100644 index 0000000000..4786caeefc --- /dev/null +++ b/be/src/vec/exec/scan/new_file_scanner.cpp @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_file_scanner.h" + +#include + +#include + +#include "common/logging.h" +#include "common/utils.h" +#include "exec/exec_node.h" +#include "exec/text_converter.hpp" +#include "exprs/expr_context.h" +#include "runtime/descriptors.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" +#include "vec/exec/scan/new_file_scan_node.h" + +namespace doris::vectorized { + +NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile) + : VScanner(state, static_cast(parent), limit, tracker), + _params(scan_range.params), + _ranges(scan_range.ranges), + _next_range(0), + _profile(profile) {} + +Status NewFileScanner::open(RuntimeState* state) { + RETURN_IF_ERROR(VScanner::open(state)); + RETURN_IF_ERROR(_init_expr_ctxes()); + return Status::OK(); +} + +Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } + + return Status::OK(); +} + +Status NewFileScanner::_init_expr_ctxes() { + const TupleDescriptor* src_tuple_desc = + _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); + if (src_tuple_desc == nullptr) { + std::stringstream ss; + ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; + return Status::InternalError(ss.str()); + } + DCHECK(!_ranges.empty()); + + std::map _full_src_index_map; + std::map _full_src_slot_map; + int index = 0; + for (const auto& slot_desc : src_tuple_desc->slots()) { + _full_src_slot_map.emplace(slot_desc->id(), slot_desc); + _full_src_index_map.emplace(slot_desc->id(), index++); + } + + _num_of_columns_from_file = _params.num_of_columns_from_file; + for (const auto& slot_info : _params.required_slots) { + auto slot_id = slot_info.slot_id; + auto it = _full_src_slot_map.find(slot_id); + if (it == std::end(_full_src_slot_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_id; + return Status::InternalError(ss.str()); + } + _required_slot_descs.emplace_back(it->second); + if (slot_info.is_file_slot) { + _file_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _file_slot_index_map.emplace(slot_id, iti->second); + } else { + _partition_slot_descs.emplace_back(it->second); + auto iti = _full_src_index_map.find(slot_id); + _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); + } + } + + _row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector({_params.src_tuple_id}), + std::vector({false}))); + + // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor + if (!_pre_filter_texprs.empty()) { + // for vectorized, preceding filter exprs should be compounded to one passed from fe. + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + } + + return Status::OK(); +} + +Status NewFileScanner::init_block(vectorized::Block* block) { + (*block).clear(); + _rows = 0; + for (const auto& slot_desc : _required_slot_descs) { + if (slot_desc == nullptr) { + continue; + } + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), + is_nullable); + if (data_type == nullptr) { + return Status::NotSupported( + fmt::format("Not support type for column:{}", slot_desc->col_name())); + } + MutableColumnPtr data_column = data_type->create_column(); + (*block).insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + return Status::OK(); +} + +Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) { + const TFileRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { + for (const auto& slot_desc : _partition_slot_descs) { + if (slot_desc == nullptr) continue; + auto it = _partition_slot_index_map.find(slot_desc->id()); + if (it == std::end(_partition_slot_index_map)) { + std::stringstream ss; + ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); + return Status::InternalError(ss.str()); + } + const std::string& column_from_path = range.columns_from_path[it->second]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + + for (size_t j = 0; j < rows; ++j) { + _text_converter->write_vec_column(slot_desc, col_ptr, + const_cast(column_from_path.c_str()), + column_from_path.size(), true, false); + } + } + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h new file mode 100644 index 0000000000..8ef31df706 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_scanner.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/text_converter.h" +#include "exprs/bloomfilter_predicate.h" +#include "exprs/function_filter.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { + +class NewFileScanNode; + +class NewFileScanner : public VScanner { +public: + NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile); + + Status open(RuntimeState* state) override; + + Status prepare(VExprContext** vconjunct_ctx_ptr); + +protected: + virtual void _init_profiles(RuntimeProfile* profile) = 0; + + Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows); + Status init_block(vectorized::Block* block); + + std::unique_ptr _text_converter; + + const TFileScanRangeParams& _params; + + const std::vector& _ranges; + int _next_range; + + // Used for constructing tuple + std::vector _required_slot_descs; + // File source slot descriptors + std::vector _file_slot_descs; + // File slot id to index map. + std::map _file_slot_index_map; + // Partition source slot descriptors + std::vector _partition_slot_descs; + // Partition slot id to index map + std::map _partition_slot_index_map; + std::unique_ptr _row_desc; + + // Profile + RuntimeProfile* _profile; + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + + bool _scanner_eof = false; + int _rows = 0; + int _num_of_columns_from_file; + +private: + Status _init_expr_ctxes(); +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp new file mode 100644 index 0000000000..da7205e8d1 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_text_scanner.cpp @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_file_text_scanner.h" + +#include "exec/plain_text_line_reader.h" +#include "io/file_factory.h" +#include "util/utf8_check.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_line_reader_eof(false), + _skip_lines(0), + _success(false) {} + +Status NewFileTextScanner::open(RuntimeState* state) { + RETURN_IF_ERROR(NewFileScanner::open(state)); + if (_ranges.empty()) { + return Status::OK(); + } + _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); + return Status::OK(); +} + +Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(init_block(block)); + + const int batch_size = state->batch_size(); + *eof = false; + int current_rows = _rows; + while (_rows < batch_size && !_scanner_eof) { + if (_cur_line_reader == nullptr || _cur_line_reader_eof) { + RETURN_IF_ERROR(_open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + } + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + { + COUNTER_UPDATE(_rows_read_counter, 1); + RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); + } + if (_cur_line_reader_eof) { + RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows)); + current_rows = _rows; + } + } + if (_scanner_eof && block->rows() == 0) { + *eof = true; + } + return Status::OK(); +} + +Status NewFileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) { + RETURN_IF_ERROR(_line_split_to_values(line)); + if (!_success) { + // If not success, which means we met an invalid row, return. + return Status::OK(); + } + + for (int i = 0; i < _split_values.size(); ++i) { + auto slot_desc = _file_slot_descs[i]; + const Slice& value = _split_values[i]; + + auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + IColumn* col_ptr = const_cast(doris_column.get()); + _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false); + } + _rows++; + return Status::OK(); +} + +Status NewFileTextScanner::_line_split_to_values(const Slice& line) { + if (!validate_utf8(line.data, line.size)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + []() -> std::string { return "Unable to display"; }, + []() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", "Unable to display"); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + _success = false; + return Status::OK(); + } + + RETURN_IF_ERROR(_split_line(line)); + + _success = true; + return Status::OK(); +} + +Status NewFileTextScanner::_open_next_reader() { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_open_line_reader()); + _next_range++; + + return Status::OK(); +} + +Status NewFileTextScanner::_open_file_reader() { + const TFileRangeDesc& range = _ranges[_next_range]; + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, + _cur_file_reader)); + return _cur_file_reader->open(); +} + +Status NewFileTextScanner::_open_line_reader() { + if (_cur_line_reader != nullptr) { + delete _cur_line_reader; + _cur_line_reader = nullptr; + } + + const TFileRangeDesc& range = _ranges[_next_range]; + int64_t size = range.size; + if (range.start_offset != 0) { + if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { + std::stringstream ss; + ss << "For now we do not support split compressed file"; + return Status::InternalError(ss.str()); + } + size += 1; + // not first range will always skip one line + _skip_lines = 1; + } + + // open line reader + switch (_params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, + _line_delimiter, _line_delimiter_length); + break; + default: { + std::stringstream ss; + ss << "Unknown format type, cannot init line reader, type=" << _params.format_type; + return Status::InternalError(ss.str()); + } + } + + _cur_line_reader_eof = false; + + return Status::OK(); +} + +Status NewFileTextScanner::_split_line(const Slice& line) { + _split_values.clear(); + std::vector tmp_split_values; + tmp_split_values.reserve(_num_of_columns_from_file); + + const char* value = line.data; + size_t start = 0; // point to the start pos of next col value. + size_t curpos = 0; // point to the start pos of separator matching sequence. + size_t p1 = 0; // point to the current pos of separator matching sequence. + size_t non_space = 0; // point to the last pos of non_space charactor. + + // Separator: AAAA + // + // p1 + // ▼ + // AAAA + // 1000AAAA2000AAAA + // ▲ ▲ + // Start │ + // curpos + + while (curpos < line.size) { + if (*(value + curpos + p1) != _value_separator[p1]) { + // Not match, move forward: + curpos += (p1 == 0 ? 1 : p1); + p1 = 0; + } else { + p1++; + if (p1 == _value_separator_length) { + // Match a separator + non_space = curpos; + // Trim tailing spaces. Be consistent with hive and trino's behavior. + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + tmp_split_values.emplace_back(value + start, non_space - start); + start = curpos + _value_separator_length; + curpos = start; + p1 = 0; + non_space = 0; + } + } + } + + CHECK(curpos == line.size) << curpos << " vs " << line.size; + non_space = curpos; + if (_state->trim_tailing_spaces_for_external_table_query()) { + while (non_space > start && *(value + non_space - 1) == ' ') { + non_space--; + } + } + + tmp_split_values.emplace_back(value + start, non_space - start); + for (const auto& slot : _file_slot_descs) { + auto it = _file_slot_index_map.find(slot->id()); + if (it == std::end(_file_slot_index_map)) { + std::stringstream ss; + ss << "Unknown _file_slot_index_map, slot_id=" << slot->id(); + return Status::InternalError(ss.str()); + } + int index = it->second; + CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file; + _split_values.emplace_back(tmp_split_values[index]); + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h new file mode 100644 index 0000000000..bfc112c870 --- /dev/null +++ b/be/src/vec/exec/scan/new_file_text_scanner.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "exec/line_reader.h" +#include "exprs/bloomfilter_predicate.h" +#include "exprs/function_filter.h" +#include "vec/exec/scan/new_file_scanner.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { +class NewFileTextScanner : public NewFileScanner { +public: + NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, + const TFileScanRange& scan_range, MemTracker* tracker, + RuntimeProfile* profile); + + Status open(RuntimeState* state) override; + +protected: + void _init_profiles(RuntimeProfile* profile) override {} + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + +private: + Status _fill_file_columns(const Slice& line, vectorized::Block* _block); + Status _open_next_reader(); + Status _open_file_reader(); + Status _open_line_reader(); + Status _line_split_to_values(const Slice& line); + Status _split_line(const Slice& line); + // Reader + std::shared_ptr _cur_file_reader; + LineReader* _cur_line_reader; + bool _cur_line_reader_eof; + + // When we fetch range start from 0, header_type="csv_with_names" skip first line + // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line + // When we fetch range doesn't start + int _skip_lines; + std::vector _split_values; + std::string _value_separator; + std::string _line_delimiter; + int _value_separator_length; + int _line_delimiter_length; + + bool _success; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 55739b2fa5..3e65392080 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -18,6 +18,7 @@ #pragma once #include "common/status.h" +#include "exprs/expr_context.h" #include "olap/tablet.h" #include "runtime/runtime_state.h" #include "vec/exprs/vexpr_context.h" @@ -62,6 +63,9 @@ protected: // Filter the output block finally. Status _filter_output_block(Block* block); + // to filter src tuple directly. + std::unique_ptr _vpre_filter_ctx_ptr; + public: VScanNode* get_parent() { return _parent; } @@ -98,6 +102,10 @@ public: VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; } + void reg_conjunct_ctxs(const std::vector& conjunct_ctxs) { + _conjunct_ctxs = conjunct_ctxs; + } + protected: void _discard_conjuncts() { if (_vconjunct_ctx) { @@ -151,6 +159,11 @@ protected: // watch to count the time wait for scanner thread MonotonicStopWatch _watch; int64_t _scanner_wait_worker_timer = 0; + + // File formats based push down predicate + std::vector _conjunct_ctxs; + + const std::vector _pre_filter_texprs; }; } // namespace doris::vectorized