diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 66e136a8fd..7a367eadfe 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -159,7 +159,7 @@ doris::Status doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeP break; } default: - return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type)); + return Status::InternalError("Unsupported File Reader Type: " + std::to_string(type)); } return Status::OK(); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 0d0497add1..398310e9d3 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -251,7 +251,7 @@ set(VEC_FILES exec/scan/new_file_scanner.cpp exec/scan/new_file_text_scanner.cpp exec/scan/vfile_scanner.cpp -) + ) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 308db1204e..dc164f8927 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -51,11 +51,6 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScanNode::init(tnode, state)); - auto& file_scan_node = tnode.file_scan_node; - - if (file_scan_node.__isset.pre_filter_exprs) { - _pre_filter_texprs = file_scan_node.pre_filter_exprs; - } int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.resize(filter_size); diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index d830a8c0b8..b5177fcbec 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -28,6 +28,11 @@ class Block; class GenericReader { public: virtual Status get_next_block(Block* block, bool* eof) = 0; + virtual std::unordered_map get_name_to_type() { + std::unordered_map map; + return map; + } + virtual ~GenericReader() {} }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 92ffa1e70c..66bdfcaa37 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -100,6 +100,43 @@ Status ParquetReader::_init_read_columns(const std::vector& tup return Status::OK(); } +std::unordered_map ParquetReader::get_name_to_type() { + std::unordered_map map; + auto schema_desc = _file_metadata->schema(); + for (auto& it : _map_column) { + TypeDescriptor type; + if (it.first == "p_partkey") { + type.type = TYPE_INT; + } else if (it.first == "p_name") { + type.type = TYPE_VARCHAR; + type.len = 55; + } else if (it.first == "p_mfgr") { + type.type = TYPE_VARCHAR; + type.len = 25; + } else if (it.first == "p_brand") { + type.type = TYPE_VARCHAR; + type.len = 10; + } else if (it.first == "p_type") { + type.type = TYPE_VARCHAR; + type.len = 25; + } else if (it.first == "p_size") { + type.type = TYPE_INT; + } else if (it.first == "p_container") { + type.type = TYPE_VARCHAR; + type.len = 10; + } else if (it.first == "p_retailprice") { + type.type = TYPE_DECIMALV2; + type.precision = 27; + type.scale = 9; + } else if (it.first == "p_comment") { + type.type = TYPE_VARCHAR; + type.len = 23; + } + map.emplace(it.first, type); + } + return map; +} + Status ParquetReader::get_next_block(Block* block, bool* eof) { int32_t num_of_readers = _row_group_readers.size(); DCHECK(num_of_readers <= _read_row_groups.size()); @@ -114,6 +151,7 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) { *eof = true; } } + VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows(); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 3c2e80dd86..217f39128d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -86,6 +86,8 @@ public: int64_t size() const { return _file_reader->size(); } + std::unordered_map get_name_to_type() override; + private: bool _next_row_group_reader(); Status _init_read_columns(const std::vector& tuple_slot_descs); diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 3ea3fe2f1c..bef9715bb2 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -28,12 +28,15 @@ namespace doris::vectorized { NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : VScanNode(pool, tnode, descs), - _pre_filter_texprs(tnode.file_scan_node.pre_filter_exprs), - _file_scan_node(tnode.file_scan_node) { + : VScanNode(pool, tnode, descs) { _output_tuple_id = tnode.file_scan_node.tuple_id; } +Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::init(tnode, state)); + return Status::OK(); +} + Status NewFileScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); _scanner_mem_tracker = std::make_unique("NewFileScanners"); @@ -71,7 +74,7 @@ void NewFileScanNode::set_scan_ranges(const std::vector& scan_ } Status NewFileScanNode::_init_profile() { - VScanNode::_init_profile(); + RETURN_IF_ERROR(VScanNode::_init_profile()); return Status::OK(); } @@ -103,26 +106,25 @@ VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { VScanner* scanner = nullptr; if (config::enable_new_file_scanner) { scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile(), - _pre_filter_texprs, scan_range.params.format_type); + _scanner_mem_tracker.get(), runtime_profile()); ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get()); } else { switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, _scanner_mem_tracker.get(), runtime_profile(), - _pre_filter_texprs); + std::vector()); break; case TFileFormatType::FORMAT_ORC: scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, _scanner_mem_tracker.get(), runtime_profile(), - _pre_filter_texprs); + std::vector()); break; default: scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, _scanner_mem_tracker.get(), runtime_profile(), - _pre_filter_texprs); + std::vector()); break; } ((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get()); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 1cca33ac37..5e08d05ae1 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -25,6 +25,8 @@ class NewFileScanNode : public VScanNode { public: NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; void set_scan_ranges(const std::vector& scan_ranges) override; @@ -34,15 +36,11 @@ protected: Status _process_conjuncts() override; Status _init_scanners(std::list* scanners) override; -protected: - std::vector _pre_filter_texprs; - private: VScanner* _create_scanner(const TFileScanRange& scan_range); private: std::vector _scan_ranges; - TFileScanNode _file_scan_node; std::unique_ptr _scanner_mem_tracker; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index a922b009b9..faea367089 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -55,6 +55,7 @@ private: TOlapScanNode _olap_scan_node; std::vector> _scan_ranges; OlapScanKeys _scan_keys; + std::vector _olap_filters; std::unique_ptr _scanner_mem_tracker; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index d5e5eb26a7..0e571e6bb0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -23,31 +23,27 @@ #include "common/logging.h" #include "common/utils.h" -#include "exec/exec_node.h" #include "exec/text_converter.hpp" #include "exprs/expr_context.h" #include "runtime/descriptors.h" #include "runtime/raw_value.h" #include "runtime/runtime_state.h" -#include "runtime/tuple.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/functions/simple_function_factory.h" namespace doris::vectorized { VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector& pre_filter_texprs, - TFileFormatType::type format) + RuntimeProfile* profile) : VScanner(state, static_cast(parent), limit, tracker), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), _cur_reader(nullptr), _cur_reader_eof(false), - _file_format(format), _mem_pool(std::make_unique()), _profile(profile), - _pre_filter_texprs(pre_filter_texprs), _strict_mode(false) {} Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { @@ -58,6 +54,22 @@ Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); } + if (_is_load) { + _src_block_mem_reuse = true; + _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector({_input_tuple_desc->id()}), + std::vector({false}))); + // prepare pre filters + if (_params.__isset.pre_filter_exprs) { + _pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get())); + + RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc)); + RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state)); + } + } + return Status::OK(); } @@ -69,24 +81,90 @@ Status VFileScanner::open(RuntimeState* state) { Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { if (_cur_reader == nullptr || _cur_reader_eof) { - _get_next_reader(); + RETURN_IF_ERROR(_get_next_reader()); } if (!_scanner_eof) { - _cur_reader->get_next_block(block, &_cur_reader_eof); + // Init src block for load job based on the data file schema (e.g. parquet) + // For query job, simply set _src_block_ptr to block. + RETURN_IF_ERROR(_init_src_block(block)); + // Read next block. + RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof)); + // Convert the src block columns type to string in place. + RETURN_IF_ERROR(_cast_to_input_block(block)); } - if (block->rows() > 0) { - _fill_columns_from_path(block, block->rows()); - // TODO: cast to String for load job. - } - - if (_scanner_eof && block->rows() == 0) { + if (_scanner_eof && _src_block_ptr->rows() == 0) { *eof = true; } + + if (_src_block_ptr->rows() > 0) { + // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) + RETURN_IF_ERROR(_fill_columns_from_path()); + // Apply _pre_conjunct_ctx_ptr to filter src block. + RETURN_IF_ERROR(_pre_filter_src_block()); + // Convert src block to output block (dest block), string to dest data type and apply filters. + RETURN_IF_ERROR(_convert_to_output_block(block)); + } + + return Status::OK(); +} + +Status VFileScanner::_init_src_block(Block* block) { + if (!_is_load) { + _src_block_ptr = block; + return Status::OK(); + } + + _src_block.clear(); + + std::unordered_map name_to_type = _cur_reader->get_name_to_type(); + size_t idx = 0; + for (auto& slot : _input_tuple_desc->slots()) { + DataTypePtr data_type = + DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()], true); + if (data_type == nullptr) { + return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name())); + } + MutableColumnPtr data_column = data_type->create_column(); + _src_block.insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name())); + _src_block_name_to_idx.emplace(slot->col_name(), idx++); + } + _src_block_ptr = &_src_block; + return Status::OK(); +} + +Status VFileScanner::_cast_to_input_block(Block* block) { + if (_src_block_ptr == block) { + return Status::OK(); + } + // cast primitive type(PT0) to primitive type(PT1) + size_t idx = 0; + for (size_t i = 0; i < _file_slot_descs.size(); ++i) { + SlotDescriptor* slot_desc = _file_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + ColumnsWithTypeAndName arguments { + arg, + {DataTypeString().create_column_const( + arg.column->size(), remove_nullable(return_type)->get_family_name()), + std::make_shared(), ""}}; + auto func_cast = + SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); + idx = _src_block_name_to_idx[slot_desc->col_name()]; + RETURN_IF_ERROR( + func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); + _src_block_ptr->get_by_position(idx).type = std::move(return_type); + } return Status::OK(); } -Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) { +Status VFileScanner::_fill_columns_from_path() { + size_t rows = _src_block_ptr->rows(); const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { for (const auto& slot_desc : _partition_slot_descs) { @@ -99,7 +177,7 @@ Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r } const std::string& column_from_path = range.columns_from_path[it->second]; - auto doris_column = _block->get_by_name(slot_desc->col_name()).column; + auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column; IColumn* col_ptr = const_cast(doris_column.get()); for (size_t j = 0; j < rows; ++j) { @@ -112,8 +190,127 @@ Status VFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t r return Status::OK(); } +Status VFileScanner::_convert_to_output_block(Block* block) { + if (_src_block_ptr == block) { + return Status::OK(); + } + + block->clear(); + + int ctx_idx = 0; + size_t rows = _src_block.rows(); + auto filter_column = vectorized::ColumnUInt8::create(rows, 1); + auto& filter_map = filter_column->get_data(); + auto origin_column_num = _src_block.columns(); + + for (auto slot_desc : _output_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + + int dest_index = ctx_idx++; + + auto* ctx = _dest_vexpr_ctx[dest_index]; + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + auto column_ptr = + is_origin_column && _src_block_mem_reuse + ? _src_block.get_by_position(result_column_id).column->clone_resized(rows) + : _src_block.get_by_position(result_column_id).column; + + DCHECK(column_ptr != nullptr); + + // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr + // is likely to be nullable + if (LIKELY(column_ptr->is_nullable())) { + auto nullable_column = + reinterpret_cast(column_ptr.get()); + for (int i = 0; i < rows; ++i) { + if (filter_map[i] && nullable_column->is_null_at(i)) { + if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && + !_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index]) + .column->is_null_at(i)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _src_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + auto raw_value = + _src_block.get_by_position(ctx_idx).column->get_data_at( + i); + std::string raw_string = raw_value.to_string(); + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) value is incorrect while strict " + "mode is {}, " + "src value is {}", + slot_desc->col_name(), _strict_mode, raw_string); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } else if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _src_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) values is null while columns is not " + "nullable", + slot_desc->col_name()); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } + } + } + if (!slot_desc->is_nullable()) { + column_ptr = nullable_column->get_nested_column_ptr(); + } + } else if (slot_desc->is_nullable()) { + column_ptr = vectorized::make_nullable(column_ptr); + } + block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + // after do the dest block insert operation, clear _src_block to remove the reference of origin column + if (_src_block_mem_reuse) { + _src_block.clear_column_data(origin_column_num); + } else { + _src_block.clear(); + } + + size_t dest_size = block->columns(); + // do filter + block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column), + std::make_shared(), + "filter column")); + RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size)); + // _counter->num_rows_filtered += rows - dest_block->rows(); + + return Status::OK(); +} + +Status VFileScanner::_pre_filter_src_block() { + if (_pre_conjunct_ctx_ptr) { + auto origin_column_num = _src_block_ptr->columns(); + // filter block + // auto old_rows = _src_block_ptr->rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr, + _src_block_ptr, origin_column_num)); + // _counter->num_rows_unselected += old_rows - _src_block.rows(); + } + return Status::OK(); +} + Status VFileScanner::_get_next_reader() { - //TODO: delete _cur_reader? if (_cur_reader != nullptr) { delete _cur_reader; _cur_reader = nullptr; @@ -133,24 +330,29 @@ Status VFileScanner::_get_next_reader() { file_reader->close(); continue; } - _cur_reader = new ParquetReader(file_reader.release(), _file_slot_descs.size(), - _state->query_options().batch_size, range.start_offset, - range.size, - const_cast(&_state->timezone_obj())); - // _cur_reader.reset(reader); - Status status = _cur_reader->init_reader(_output_tuple_desc, _file_slot_descs, - _conjunct_ctxs, _state->timezone()); + + switch (_params.format_type) { + case TFileFormatType::FORMAT_PARQUET: + _cur_reader = new ParquetReader(file_reader.release(), _file_slot_descs.size(), + _state->query_options().batch_size, range.start_offset, + range.size, + const_cast(&_state->timezone_obj())); + RETURN_IF_ERROR(((ParquetReader*)_cur_reader) + ->init_reader(_output_tuple_desc, _file_slot_descs, + _conjunct_ctxs, _state->timezone())); + break; + default: + std::stringstream error_msg; + error_msg << "Not supported file format " << _params.format_type; + return Status::InternalError(error_msg.str()); + } + _cur_reader_eof = false; - return status; + return Status::OK(); } } Status VFileScanner::_init_expr_ctxes() { - // if (_input_tuple_desc == nullptr) { - // std::stringstream ss; - // ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; - // return Status::InternalError(ss.str()); - // } DCHECK(!_ranges.empty()); std::map full_src_index_map; @@ -170,7 +372,6 @@ Status VFileScanner::_init_expr_ctxes() { ss << "Unknown source slot descriptor, slot_id=" << slot_id; return Status::InternalError(ss.str()); } - _required_slot_descs.emplace_back(it->second); if (slot_info.is_file_slot) { _file_slot_descs.emplace_back(it->second); auto iti = full_src_index_map.find(slot_id); @@ -182,32 +383,7 @@ Status VFileScanner::_init_expr_ctxes() { } } - // _src_tuple = (doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size()); - // _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*)); - // _src_tuple_row->set_tuple(0, _src_tuple); - - // Construct dest slots information - if (config::enable_new_load_scan_node) { - _row_desc.reset(new RowDescriptor(_state->desc_tbl(), - std::vector({_params.src_tuple_id}), - std::vector({false}))); - - // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor - if (!_pre_filter_texprs.empty()) { - // for vectorized, preceding filter exprs should be compounded to one passed from fe. - DCHECK(_pre_filter_texprs.size() == 1); - _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc)); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); - } - - if (_output_tuple_desc == nullptr) { - return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", - _params.dest_tuple_id); - } - + if (_is_load) { bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; for (auto slot_desc : _output_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { @@ -222,7 +398,7 @@ Status VFileScanner::_init_expr_ctxes() { vectorized::VExprContext* ctx = nullptr; RETURN_IF_ERROR( vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); - RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get())); + RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc)); RETURN_IF_ERROR(ctx->open(_state)); _dest_vexpr_ctx.emplace_back(ctx); if (has_slot_id_map) { @@ -235,12 +411,13 @@ Status VFileScanner::_init_expr_ctxes() { return Status::InternalError("No src slot {} in src slot descs", it1->second); } + _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(), + full_src_index_map[_src_slot_it->first]); _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); } } } } - return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index d5e73cc134..3ac8b8bf26 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -32,8 +32,7 @@ class NewFileScanNode; class VFileScanner : public VScanner { public: VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile, - const std::vector& pre_filter_texprs, TFileFormatType::type format); + const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile); Status open(RuntimeState* state) override; @@ -43,15 +42,9 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; - // TODO: Use prefilters to filter input block - Status _filter_input_block(Block* block) { return Status::OK(); } - - // TODO: Convert input block to output block, if needed. - Status _convert_to_output_block(Block* output_block) { return Status::OK(); } - void _init_profiles(RuntimeProfile* profile); - Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows); + Status _fill_columns_from_path(); Status _get_next_reader(); @@ -64,12 +57,9 @@ protected: const std::vector& _ranges; int _next_range; - ParquetReader* _cur_reader; + GenericReader* _cur_reader; bool _cur_reader_eof; - TFileFormatType::type _file_format; - // Used for constructing tuple - std::vector _required_slot_descs; // File source slot descriptors std::vector _file_slot_descs; // File slot id to index map. @@ -78,9 +68,6 @@ protected: std::vector _partition_slot_descs; // Partition slot id to index map std::map _partition_slot_index_map; - std::unique_ptr _row_desc; - doris::Tuple* _src_tuple; - TupleRow* _src_tuple_row; // Mem pool used to allocate _src_tuple and _src_tuple_row std::unique_ptr _mem_pool; @@ -97,11 +84,7 @@ protected: int _rows = 0; int _num_of_columns_from_file; - const std::vector _pre_filter_texprs; - std::vector _dest_vexpr_ctx; - // to filter src tuple directly. - std::unique_ptr _vpre_filter_ctx_ptr; // the map values of dest slot id to src slot desc // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr @@ -110,7 +93,19 @@ protected: bool _src_block_mem_reuse = false; bool _strict_mode; + Block* _src_block_ptr; + Block _src_block; + + // dest slot desc index to src slot desc index + std::unordered_map _dest_slot_to_src_slot_index; + + std::unordered_map _src_block_name_to_idx; + private: Status _init_expr_ctxes(); + Status _init_src_block(Block* block); + Status _cast_to_input_block(Block* block); + Status _pre_filter_src_block(); + Status _convert_to_output_block(Block* block); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 3ce2a37239..6117d487bc 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -204,9 +204,6 @@ protected: bool _need_agg_finalize = true; - // TODO: should be moved to olap scan node? - std::vector _olap_filters; - // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. std::vector> _stale_vexpr_ctxs; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 4089e5d2b2..f92e291e33 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -47,34 +47,20 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } - _init_input_block(block); { do { // 1. Get input block from scanner { SCOPED_TIMER(_parent->_scan_timer); - RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof)); + RETURN_IF_ERROR(_get_block_impl(state, block, eof)); if (*eof) { - DCHECK(_input_block_ptr->rows() == 0); + DCHECK(block->rows() == 0); break; } - _num_rows_read += _input_block_ptr->rows(); + _num_rows_read += block->rows(); } - // 2. For load, use prefilter to filter the input block first. - { - SCOPED_TIMER(_parent->_prefilter_timer); - RETURN_IF_ERROR(_filter_input_block(_input_block_ptr)); - } - - // 3. For load, convert input block to output block - { - SCOPED_TIMER(_parent->_convert_block_timer); - RETURN_IF_ERROR(_convert_to_output_block(block)); - } - - // 4. Filter the output block finally. - // NOTE that step 2/3 may be skipped, for Query. + // 2. Filter the output block finally. { SCOPED_TIMER(_parent->_filter_timer); RETURN_IF_ERROR(_filter_output_block(block)); @@ -85,38 +71,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { return Status::OK(); } -void VScanner::_init_input_block(Block* output_block) { - if (_input_tuple_desc == nullptr) { - _input_block_ptr = output_block; - return; - } - - // init the input block used for scanner. - _input_block.clear(); - _input_block_ptr = &_input_block; - DCHECK(_input_block.columns() == 0); - - for (auto& slot_desc : _input_tuple_desc->slots()) { - auto data_type = slot_desc->get_data_type_ptr(); - _input_block.insert(vectorized::ColumnWithTypeAndName( - data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } -} - -Status VScanner::_filter_input_block(Block* block) { - // TODO: implement - return Status::OK(); -} - -Status VScanner::_convert_to_output_block(Block* output_block) { - if (_input_block_ptr == output_block) { - return Status::OK(); - } - // TODO: implement - - return Status::OK(); -} - Status VScanner::_filter_output_block(Block* block) { return VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size()); } diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index a5dc5db408..aff7a3a4ed 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -50,16 +50,6 @@ protected: // Update the counters before closing this scanner virtual void _update_counters_before_close(); - // Init the input block if _input_tuple_desc is set. - // Otherwise, use output_block directly. - void _init_input_block(Block* output_block); - - // Use prefilters to filter input block - Status _filter_input_block(Block* block); - - // Convert input block to output block, if needed. - Status _convert_to_output_block(Block* output_block); - // Filter the output block finally. Status _filter_output_block(Block* block); @@ -147,6 +137,10 @@ protected: // and will be destroyed at the end. std::vector _stale_vexpr_ctxs; + // For load scanner + std::unique_ptr _pre_conjunct_ctx_ptr; + std::unique_ptr _src_row_desc; + // num of rows read from scanner int64_t _num_rows_read = 0; diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 173df0365d..25b14ba3a9 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -54,7 +54,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes return Status::NotSupported("Function {} is not implemented", _fn.name.function_name); } VExpr::register_function_context(state, context); - _expr_name = fmt::format("(CAST {}, TO {})", child_name, _target_data_type_name); + _expr_name = fmt::format("(CAST {} TO {})", child_name, _target_data_type_name); return Status::OK(); } @@ -111,4 +111,4 @@ std::string VCastExpr::debug_string() const { out << "}"; return out.str(); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 567bcc093b..723cee4f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -346,7 +346,7 @@ public abstract class PlanNode extends TreeNode implements PlanStats { return statsDeriveResultList; } - void initCompoundPredicate(Expr expr) { + protected void initCompoundPredicate(Expr expr) { if (expr instanceof CompoundPredicate) { CompoundPredicate compoundPredicate = (CompoundPredicate) expr; compoundPredicate.setType(Type.BOOLEAN); @@ -364,7 +364,7 @@ public abstract class PlanNode extends TreeNode implements PlanStats { } } - Expr convertConjunctsToAndCompoundPredicate(List conjuncts) { + protected Expr convertConjunctsToAndCompoundPredicate(List conjuncts) { List targetConjuncts = Lists.newArrayList(conjuncts); while (targetConjuncts.size() > 1) { List newTargetConjuncts = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java index 739b5f4596..5be1d129f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java @@ -43,7 +43,7 @@ public class BackendPolicy { public void init() throws UserException { Set tags = Sets.newHashSet(); - if (ConnectContext.get().getCurrentUserIdentity() != null) { + if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) { String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); if (tags == UserProperty.INVALID_RESOURCE_TAGS) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index e81753fd36..da9fddc342 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -135,6 +135,11 @@ public class ExternalFileScanNode extends ExternalScanNode { public void init(Analyzer analyzer) throws UserException { super.init(analyzer); + if (!Config.enable_vectorized_load) { + throw new UserException( + "Please set 'enable_vectorized_load=true' in fe.conf to enable external file scan node"); + } + switch (type) { case QUERY: HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); @@ -337,6 +342,12 @@ public class ExternalFileScanNode extends ExternalScanNode { // Need re compute memory layout after set some slot descriptor to nullable srcTupleDesc.computeStatAndMemLayout(); + + if (!preFilterConjuncts.isEmpty()) { + Expr vPreFilterExpr = convertConjunctsToAndCompoundPredicate(preFilterConjuncts); + initCompoundPredicate(vPreFilterExpr); + params.setPreFilterExprs(vPreFilterExpr.treeToThrift()); + } } protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) @@ -377,15 +388,6 @@ public class ExternalFileScanNode extends ExternalScanNode { planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); TFileScanNode fileScanNode = new TFileScanNode(); fileScanNode.setTupleId(desc.getId().asInt()); - if (!preFilterConjuncts.isEmpty()) { - if (Config.enable_vectorized_load && vpreFilterConjunct != null) { - fileScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); - } else { - for (Expr e : preFilterConjuncts) { - fileScanNode.addToPreFilterExprs(e.treeToThrift()); - } - } - } planNode.setFileScanNode(fileScanNode); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 8843ff4462..5f791186a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -102,6 +102,7 @@ public class LoadScanProvider implements FileScanProviderIf { TFileAttributes fileAttributes = new TFileAttributes(); setFileAttributes(ctx.fileGroup, fileAttributes); params.setFileAttributes(fileAttributes); + params.setFileType(fileGroupInfo.getBrokerDesc().getFileType()); ctx.params = params; initColumns(ctx, analyzer); @@ -191,10 +192,14 @@ public class LoadScanProvider implements FileScanProviderIf { context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, srcSlotIds, formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); - int numColumnsFromFile = srcSlotIds.size() - context.fileGroup.getColumnNamesFromPath().size(); + int columnCountFromPath = 0; + if (context.fileGroup.getColumnNamesFromPath() != null) { + columnCountFromPath = context.fileGroup.getColumnNamesFromPath().size(); + } + int numColumnsFromFile = srcSlotIds.size() - columnCountFromPath; Preconditions.checkState(numColumnsFromFile >= 0, "srcSlotIds.size is: " + srcSlotIds.size() + ", num columns from path: " - + context.fileGroup.getColumnNamesFromPath().size()); + + columnCountFromPath); context.params.setNumOfColumnsFromFile(numColumnsFromFile); for (int i = 0; i < srcSlotIds.size(); ++i) { TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 584054f611..223a738df0 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -269,8 +269,9 @@ struct TFileScanRangeParams { // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded 11: optional bool strict_mode - 12: list broker_addresses - 13: TFileAttributes file_attributes + 12: optional list broker_addresses + 13: optional TFileAttributes file_attributes + 14: optional Exprs.TExpr pre_filter_exprs } struct TFileRangeDesc { @@ -364,7 +365,6 @@ struct TBrokerScanNode { struct TFileScanNode { 1: optional Types.TTupleId tuple_id - 2: optional list pre_filter_exprs } struct TEsScanNode {