diff --git a/be/src/common/config.h b/be/src/common/config.h index cf24e4de8b..eee4be2d30 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -866,6 +866,9 @@ CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); // so as to avoid occupying the execution thread for a long time. CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); +// Temp config. True to use new file scan node to do load job. Will remove after fully test. +CONF_Bool(enable_new_load_scan_node, "false"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index b6e64e57b9..c7c88771c6 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -170,13 +170,13 @@ Status BaseScanner::init_expr_ctxes() { _dest_expr_ctx.emplace_back(ctx); } if (has_slot_id_map) { - auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { + auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { _src_slot_descs_order_by_dest.emplace_back(nullptr); } else { - auto _src_slot_it = src_slot_desc_map.find(it->second); + auto _src_slot_it = src_slot_desc_map.find(it1->second); if (_src_slot_it == std::end(src_slot_desc_map)) { - return Status::InternalError("No src slot {} in src slot descs", it->second); + return Status::InternalError("No src slot {} in src slot descs", it1->second); } _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); } diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 0679154409..308db1204e 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -47,9 +47,7 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _scan_finished(false), _max_buffered_batches(32), _wait_scanner_timer(nullptr), - _runtime_filter_descs(tnode.runtime_filters) { - LOG(WARNING) << "file scan node runtime filter size=" << _runtime_filter_descs.size(); -} + _runtime_filter_descs(tnode.runtime_filters) {} Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScanNode::init(tnode, state)); diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index 02da0bca2c..8b40b52d40 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -49,14 +49,14 @@ FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile, { _init_profiles(profile); - if (params.__isset.text_params) { - auto text_params = params.text_params; - if (text_params.__isset.column_separator_str) { - _value_separator = text_params.column_separator_str; + if (params.file_attributes.__isset.text_params) { + auto text_params = params.file_attributes.text_params; + if (text_params.__isset.column_separator) { + _value_separator = text_params.column_separator; _value_separator_length = _value_separator.length(); } - if (text_params.__isset.line_delimiter_str) { - _line_delimiter = text_params.line_delimiter_str; + if (text_params.__isset.line_delimiter) { + _line_delimiter = text_params.line_delimiter; _line_delimiter_length = _line_delimiter.length(); } } diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp index acc98194d7..b204cd4174 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp @@ -21,14 +21,16 @@ #include "exec/arrow/parquet_reader.h" #include "io/file_factory.h" #include "vec/exec/scan/vscan_node.h" +#include "vec/functions/simple_function_factory.h" #include "vec/utils/arrow_column_to_doris_column.h" namespace doris::vectorized { NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_file_eof(false), _batch(nullptr), @@ -57,7 +59,9 @@ Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, b } *eof = false; - RETURN_IF_ERROR(init_block(block)); + if (!_is_load) { + RETURN_IF_ERROR(init_block(block)); + } // convert arrow batch to block until reach the batch_size while (!_scanner_eof) { // cast arrow type to PT0 and append it to block @@ -122,6 +126,46 @@ Status NewFileArrowScanner::_append_batch_to_block(Block* block) { return _fill_columns_from_path(block, num_elements); } +Status NewFileArrowScanner::_convert_to_output_block(Block* output_block) { + if (!config::enable_new_load_scan_node) { + return Status::OK(); + } + if (_input_block_ptr == output_block) { + return Status::OK(); + } + RETURN_IF_ERROR(_cast_src_block(_input_block_ptr)); + if (LIKELY(_input_block_ptr->rows() > 0)) { + RETURN_IF_ERROR(_materialize_dest_block(output_block)); + } + + return Status::OK(); +} + +// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> +// primitive type(PT1) ==materialize_block==> dest primitive type +Status NewFileArrowScanner::_cast_src_block(Block* block) { + // cast primitive type(PT0) to primitive type(PT1) + for (size_t i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _required_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto& arg = block->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + ColumnsWithTypeAndName arguments { + arg, + {DataTypeString().create_column_const( + arg.column->size(), remove_nullable(return_type)->get_family_name()), + std::make_shared(), ""}}; + auto func_cast = + SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); + RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); + block->get_by_position(i).type = std::move(return_type); + } + return Status::OK(); +} + Status NewFileArrowScanner::_next_arrow_batch() { _arrow_batch_cur_idx = 0; // first, init file reader @@ -195,8 +239,10 @@ Status NewFileArrowScanner::_open_next_reader() { NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) { + MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, + pre_filter_texprs) { // _init_profiles(profile); } @@ -211,8 +257,10 @@ ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* file_reade NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) {} + RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, + pre_filter_texprs) {} ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h index 3a85ce65c5..d1447fbf0c 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.h +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h @@ -29,7 +29,7 @@ class NewFileArrowScanner : public NewFileScanner { public: NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; protected: @@ -37,12 +37,15 @@ protected: virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) = 0; + // Convert input block to output block, if needed. + Status _convert_to_output_block(Block* output_block); private: Status _open_next_reader(); Status _next_arrow_batch(); Status _init_arrow_batch_if_necessary(); Status _append_batch_to_block(Block* block); + Status _cast_src_block(Block* block); private: // Reader @@ -56,7 +59,7 @@ class NewFileParquetScanner final : public NewFileArrowScanner { public: NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); ~NewFileParquetScanner() override = default; @@ -72,7 +75,7 @@ class NewFileORCScanner final : public NewFileArrowScanner { public: NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); ~NewFileORCScanner() override = default; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index c978ef0867..53693ac5b7 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -27,7 +27,9 @@ namespace doris::vectorized { NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : VScanNode(pool, tnode, descs), _file_scan_node(tnode.file_scan_node) { + : VScanNode(pool, tnode, descs), + _pre_filter_texprs(tnode.file_scan_node.pre_filter_exprs), + _file_scan_node(tnode.file_scan_node) { _output_tuple_id = tnode.file_scan_node.tuple_id; } @@ -95,16 +97,19 @@ VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; case TFileFormatType::FORMAT_ORC: scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; default: scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; } _scanner_pool.add(scanner); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index c523bbb737..1cca33ac37 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -34,6 +34,9 @@ protected: Status _process_conjuncts() override; Status _init_scanners(std::list* scanners) override; +protected: + std::vector _pre_filter_texprs; + private: VScanner* _create_scanner(const TFileScanRange& scan_range); diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp index 4786caeefc..f037595953 100644 --- a/be/src/vec/exec/scan/new_file_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_scanner.cpp @@ -36,12 +36,15 @@ namespace doris::vectorized { NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) + RuntimeProfile* profile, const std::vector& pre_filter_texprs) : VScanner(state, static_cast(parent), limit, tracker), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), - _profile(profile) {} + _mem_pool(std::make_unique()), + _profile(profile), + _pre_filter_texprs(pre_filter_texprs), + _strict_mode(false) {} Status NewFileScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); @@ -99,6 +102,9 @@ Status NewFileScanner::_init_expr_ctxes() { } } + _src_tuple = (doris::Tuple*)_mem_pool->allocate(src_tuple_desc->byte_size()); + _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*)); + _src_tuple_row->set_tuple(0, _src_tuple); _row_desc.reset(new RowDescriptor(_state->desc_tbl(), std::vector({_params.src_tuple_id}), std::vector({false}))); @@ -114,6 +120,47 @@ Status NewFileScanner::_init_expr_ctxes() { RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); } + // Construct dest slots information + if (config::enable_new_load_scan_node) { + _dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id); + if (_dest_tuple_desc == nullptr) { + return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", + _params.dest_tuple_id); + } + + bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + auto it = _params.expr_of_dest_slot.find(slot_desc->id()); + if (it == std::end(_params.expr_of_dest_slot)) { + return Status::InternalError("No expr for dest slot, id={}, name={}", + slot_desc->id(), slot_desc->col_name()); + } + + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get())); + RETURN_IF_ERROR(ctx->open(_state)); + _dest_vexpr_ctx.emplace_back(ctx); + if (has_slot_id_map) { + auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { + _src_slot_descs_order_by_dest.emplace_back(nullptr); + } else { + auto _src_slot_it = _full_src_slot_map.find(it1->second); + if (_src_slot_it == std::end(_full_src_slot_map)) { + return Status::InternalError("No src slot {} in src slot descs", + it1->second); + } + _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); + } + } + } + } + return Status::OK(); } @@ -164,4 +211,112 @@ Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t return Status::OK(); } +Status NewFileScanner::_filter_input_block(Block* block) { + if (!config::enable_new_load_scan_node) { + return Status::OK(); + } + if (_is_load) { + auto origin_column_num = block->columns(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, block, + origin_column_num)); + } + return Status::OK(); +} + +Status NewFileScanner::_materialize_dest_block(vectorized::Block* dest_block) { + // Do vectorized expr here + int ctx_idx = 0; + size_t rows = _input_block.rows(); + auto filter_column = vectorized::ColumnUInt8::create(rows, 1); + auto& filter_map = filter_column->get_data(); + auto origin_column_num = _input_block.columns(); + + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + + auto* ctx = _dest_vexpr_ctx[dest_index]; + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(_input_block_ptr, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + auto column_ptr = + is_origin_column && _src_block_mem_reuse + ? _input_block.get_by_position(result_column_id).column->clone_resized(rows) + : _input_block.get_by_position(result_column_id).column; + + DCHECK(column_ptr != nullptr); + + // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr + // is likely to be nullable + if (LIKELY(column_ptr->is_nullable())) { + auto nullable_column = + reinterpret_cast(column_ptr.get()); + for (int i = 0; i < rows; ++i) { + if (filter_map[i] && nullable_column->is_null_at(i)) { + if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && + !_input_block.get_by_position(dest_index).column->is_null_at(i)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _input_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + auto raw_value = _input_block.get_by_position(ctx_idx) + .column->get_data_at(i); + std::string raw_string = raw_value.to_string(); + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) value is incorrect while strict " + "mode is {}, " + "src value is {}", + slot_desc->col_name(), _strict_mode, raw_string); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } else if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _input_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) values is null while columns is not " + "nullable", + slot_desc->col_name()); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } + } + } + if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr(); + } else if (slot_desc->is_nullable()) { + column_ptr = vectorized::make_nullable(column_ptr); + } + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); + } + + // after do the dest block insert operation, clear _src_block to remove the reference of origin column + if (_src_block_mem_reuse) { + _input_block.clear_column_data(origin_column_num); + } else { + _input_block.clear(); + } + + size_t dest_size = dest_block->columns(); + // do filter + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(filter_column), std::make_shared(), + "filter column")); + RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size)); + + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h index 8ef31df706..9beae0ebe0 100644 --- a/be/src/vec/exec/scan/new_file_scanner.h +++ b/be/src/vec/exec/scan/new_file_scanner.h @@ -20,6 +20,7 @@ #include "exec/text_converter.h" #include "exprs/bloomfilter_predicate.h" #include "exprs/function_filter.h" +#include "runtime/tuple.h" #include "vec/exec/scan/vscanner.h" namespace doris::vectorized { @@ -29,12 +30,18 @@ class NewFileScanNode; class NewFileScanner : public VScanner { public: NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile); + const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; Status prepare(VExprContext** vconjunct_ctx_ptr); +protected: + // Use prefilters to filter input block + Status _filter_input_block(Block* block); + Status _materialize_dest_block(vectorized::Block* output_block); + protected: virtual void _init_profiles(RuntimeProfile* profile) = 0; @@ -59,6 +66,14 @@ protected: // Partition slot id to index map std::map _partition_slot_index_map; std::unique_ptr _row_desc; + doris::Tuple* _src_tuple; + TupleRow* _src_tuple_row; + + // Mem pool used to allocate _src_tuple and _src_tuple_row + std::unique_ptr _mem_pool; + + // Dest tuple descriptor and dest expr context + const TupleDescriptor* _dest_tuple_desc; // Profile RuntimeProfile* _profile; @@ -69,6 +84,19 @@ protected: int _rows = 0; int _num_of_columns_from_file; + const std::vector _pre_filter_texprs; + + std::vector _dest_vexpr_ctx; + // to filter src tuple directly. + std::unique_ptr _vpre_filter_ctx_ptr; + + // the map values of dest slot id to src slot desc + // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr + std::vector _src_slot_descs_order_by_dest; + + bool _src_block_mem_reuse = false; + bool _strict_mode; + private: Status _init_expr_ctxes(); }; diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp index da7205e8d1..a1e2cc9b56 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_text_scanner.cpp @@ -26,8 +26,9 @@ namespace doris::vectorized { NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_line_reader_eof(false), @@ -45,8 +46,9 @@ Status NewFileTextScanner::open(RuntimeState* state) { Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { SCOPED_TIMER(_read_timer); - RETURN_IF_ERROR(init_block(block)); - + if (!_is_load) { + RETURN_IF_ERROR(init_block(block)); + } const int batch_size = state->batch_size(); *eof = false; int current_rows = _rows; @@ -248,4 +250,14 @@ Status NewFileTextScanner::_split_line(const Slice& line) { } return Status::OK(); } + +Status NewFileTextScanner::_convert_to_output_block(Block* output_block) { + if (_input_block_ptr == output_block) { + return Status::OK(); + } + if (LIKELY(_input_block_ptr->rows() > 0)) { + RETURN_IF_ERROR(_materialize_dest_block(output_block)); + } + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h index bfc112c870..ab12389450 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.h +++ b/be/src/vec/exec/scan/new_file_text_scanner.h @@ -30,13 +30,14 @@ class NewFileTextScanner : public NewFileScanner { public: NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; protected: void _init_profiles(RuntimeProfile* profile) override {} Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + Status _convert_to_output_block(Block* output_block); private: Status _fill_file_columns(const Slice& line, vectorized::Block* _block); diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 40de8af22c..4089e5d2b2 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -30,6 +30,7 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTra _output_tuple_desc(parent->output_tuple_desc()) { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; _total_rf_num = _parent->runtime_filter_num(); + _is_load = (_input_tuple_desc != nullptr); } Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 3e65392080..4b356b15ed 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -63,9 +63,6 @@ protected: // Filter the output block finally. Status _filter_output_block(Block* block); - // to filter src tuple directly. - std::unique_ptr _vpre_filter_ctx_ptr; - public: VScanNode* get_parent() { return _parent; } @@ -163,7 +160,7 @@ protected: // File formats based push down predicate std::vector _conjunct_ctxs; - const std::vector _pre_filter_texprs; + bool _is_load = false; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 4dcc64f398..22cef54849 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1695,7 +1695,7 @@ public class Config extends ConfigBase { * Should be removed when this feature is ready. */ @ConfField(mutable = false, masterOnly = true) - public static boolean enable_multi_catalog = false; // 1 min + public static boolean enable_multi_catalog = false; @ConfField(mutable = true, masterOnly = false) public static long file_scan_node_split_size = 256 * 1024 * 1024; // 256mb @@ -1761,4 +1761,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec + + /** + * Temp config, should be removed when new file scan node is ready. + */ + @ConfField(mutable = true) + public static boolean enable_new_load_scan_node = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index c379b268a2..673408811f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -62,7 +62,7 @@ public class BrokerFileGroup implements Writable { private static final Logger LOG = LogManager.getLogger(BrokerFileGroup.class); private long tableId; - private String valueSeparator; + private String columnSeparator; private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix private String fileFormat; @@ -75,7 +75,7 @@ public class BrokerFileGroup implements Writable { private List fileFieldNames; // partition columnNames - private List columnsFromPath; + private List columnNamesFromPath; // columnExprList includes all fileFieldNames, columnsFromPath and column mappings // this param will be recreated by data desc when the log replay private List columnExprList; @@ -111,7 +111,7 @@ public class BrokerFileGroup implements Writable { // Used for broker table, no need to parse public BrokerFileGroup(BrokerTable table) throws AnalysisException { this.tableId = table.getId(); - this.valueSeparator = Separator.convertSeparator(table.getColumnSeparator()); + this.columnSeparator = Separator.convertSeparator(table.getColumnSeparator()); this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter()); this.isNegative = false; this.filePaths = table.getPaths(); @@ -130,26 +130,22 @@ public class BrokerFileGroup implements Writable { /** * Should used for hive/iceberg/hudi external table. */ - public BrokerFileGroup(long tableId, - String columnSeparator, - String lineDelimiter, - String filePath, - String fileFormat, - List columnsFromPath, - List columnExprList) throws AnalysisException { + public BrokerFileGroup(long tableId, String columnSeparator, String lineDelimiter, String filePath, + String fileFormat, List columnNamesFromPath, List columnExprList) + throws AnalysisException { this.tableId = tableId; - this.valueSeparator = Separator.convertSeparator(columnSeparator); + this.columnSeparator = Separator.convertSeparator(columnSeparator); this.lineDelimiter = Separator.convertSeparator(lineDelimiter); this.isNegative = false; this.filePaths = Lists.newArrayList(filePath); this.fileFormat = fileFormat; - this.columnsFromPath = columnsFromPath; + this.columnNamesFromPath = columnNamesFromPath; this.columnExprList = columnExprList; } public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); - this.columnsFromPath = dataDescription.getColumnsFromPath(); + this.columnNamesFromPath = dataDescription.getColumnsFromPath(); this.columnExprList = dataDescription.getParsedColumnExprList(); this.columnToHadoopFunction = dataDescription.getColumnToHadoopFunction(); this.precedingFilterExpr = dataDescription.getPrecdingFilterExpr(); @@ -202,9 +198,9 @@ public class BrokerFileGroup implements Writable { } // column - valueSeparator = dataDescription.getColumnSeparator(); - if (valueSeparator == null) { - valueSeparator = "\t"; + columnSeparator = dataDescription.getColumnSeparator(); + if (columnSeparator == null) { + columnSeparator = "\t"; } lineDelimiter = dataDescription.getLineDelimiter(); if (lineDelimiter == null) { @@ -213,8 +209,7 @@ public class BrokerFileGroup implements Writable { fileFormat = dataDescription.getFileFormat(); if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase("parquet") - && !fileFormat.equalsIgnoreCase(FeConstants.csv) + if (!fileFormat.equalsIgnoreCase("parquet") && !fileFormat.equalsIgnoreCase(FeConstants.csv) && !fileFormat.equalsIgnoreCase("orc") && !fileFormat.equalsIgnoreCase("json") && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) @@ -269,8 +264,8 @@ public class BrokerFileGroup implements Writable { return tableId; } - public String getValueSeparator() { - return valueSeparator; + public String getColumnSeparator() { + return columnSeparator; } public String getLineDelimiter() { @@ -301,8 +296,8 @@ public class BrokerFileGroup implements Writable { return filePaths; } - public List getColumnsFromPath() { - return columnsFromPath; + public List getColumnNamesFromPath() { + return columnNamesFromPath; } public List getColumnExprList() { @@ -428,10 +423,10 @@ public class BrokerFileGroup implements Writable { } sb.append("]"); } - if (columnsFromPath != null) { + if (columnNamesFromPath != null) { sb.append(",columnsFromPath=["); int idx = 0; - for (String name : columnsFromPath) { + for (String name : columnNamesFromPath) { if (idx++ != 0) { sb.append(","); } @@ -450,7 +445,7 @@ public class BrokerFileGroup implements Writable { } sb.append("]"); } - sb.append(",valueSeparator=").append(valueSeparator) + sb.append(",valueSeparator=").append(columnSeparator) .append(",lineDelimiter=").append(lineDelimiter) .append(",fileFormat=").append(fileFormat) .append(",isNegative=").append(isNegative); @@ -476,7 +471,7 @@ public class BrokerFileGroup implements Writable { // tableId out.writeLong(tableId); // valueSeparator - Text.writeString(out, valueSeparator); + Text.writeString(out, columnSeparator); // lineDelimiter Text.writeString(out, lineDelimiter); // isNegative @@ -523,7 +518,7 @@ public class BrokerFileGroup implements Writable { @Deprecated public void readFields(DataInput in) throws IOException { tableId = in.readLong(); - valueSeparator = Text.readString(in); + columnSeparator = Text.readString(in); lineDelimiter = Text.readString(in); isNegative = in.readBoolean(); // partitionIds diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 4d11eeb493..569fbe69de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -87,7 +87,6 @@ import org.apache.doris.system.Backend; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.task.PushTask; -import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; @@ -806,14 +805,13 @@ public class Load { * And it must be called in same db lock when planing. */ public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, - Map>> columnToHadoopFunction, - Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, List hiddenColumns, - boolean useVectorizedLoad) throws UserException { + Map>> columnToHadoopFunction, Map exprsByName, + Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad) + throws UserException { rewriteColumns(columnDescs); - initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, - srcTupleDesc, slotDescByName, params, formatType, hiddenColumns, useVectorizedLoad, true); + initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName, + srcSlotIds, formatType, hiddenColumns, useVectorizedLoad, true); } /* @@ -825,11 +823,10 @@ public class Load { * 5. init slot descs and expr map for load plan */ private static void initColumns(Table tbl, List columnExprs, - Map>> columnToHadoopFunction, - Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad, - boolean needInitSlotAndAnalyzeExprs) throws UserException { + Map>> columnToHadoopFunction, Map exprsByName, + Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad, + boolean needInitSlotAndAnalyzeExprs) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. // skip the mapping columns not exist in schema @@ -988,7 +985,7 @@ public class Load { slotDesc.setIsNullable(true); } slotDesc.setIsMaterialized(true); - params.addToSrcSlotIds(slotDesc.getId().asInt()); + srcSlotIds.add(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index dedd436a78..535ec50eed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.ExternalFileScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TUniqueId; @@ -143,9 +144,17 @@ public class LoadingTaskPlanner { // Generate plan trees // 1. Broker scan node - BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "BrokerScanNode", + ScanNode scanNode; + if (Config.enable_new_load_scan_node) { + scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "FileScanNode"); + ((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, + fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo); + } else { + scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode", fileStatusesList, filesAdded); - scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo); + ((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, + loadParallelism, userInfo); + } scanNode.init(analyzer); scanNode.finalize(analyzer); if (Config.enable_vectorized_load) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index d255c458e6..622a82d2af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -506,10 +506,8 @@ public class SparkLoadPendingTask extends LoadTask { fileGroup.isNegative(), columnMappings, where, partitionIds); } else { etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames, - fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(), - fileGroup.getLineDelimiter(), fileGroup.isNegative(), - fileGroup.getFileFormat(), columnMappings, - where, partitionIds); + fileGroup.getColumnNamesFromPath(), fileGroup.getColumnSeparator(), fileGroup.getLineDelimiter(), + fileGroup.isNegative(), fileGroup.getFileFormat(), columnMappings, where, partitionIds); } return etlFileGroup; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index b67e1e91bb..7dcf633598 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -235,11 +235,11 @@ public class BrokerScanNode extends LoadScanNode { context.params = params; BrokerFileGroup fileGroup = context.fileGroup; - params.setColumnSeparator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]); + params.setColumnSeparator(fileGroup.getColumnSeparator().getBytes(Charset.forName("UTF-8"))[0]); params.setLineDelimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]); - params.setColumnSeparatorStr(fileGroup.getValueSeparator()); + params.setColumnSeparatorStr(fileGroup.getColumnSeparator()); params.setLineDelimiterStr(fileGroup.getLineDelimiter()); - params.setColumnSeparatorLength(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8")).length); + params.setColumnSeparatorLength(fileGroup.getColumnSeparator().getBytes(Charset.forName("UTF-8")).length); params.setLineDelimiterLength(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8")).length); params.setStrictMode(strictMode); params.setProperties(brokerDesc.getProperties()); @@ -283,9 +283,8 @@ public class BrokerScanNode extends LoadScanNode { } if (targetTable != null) { - Load.initColumns(targetTable, columnDescs, - context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, - context.srcTupleDescriptor, context.slotDescByName, context.params, + Load.initColumns(targetTable, columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, + analyzer, context.srcTupleDescriptor, context.slotDescByName, context.params.getSrcSlotIds(), formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); } } @@ -514,7 +513,7 @@ public class BrokerScanNode extends LoadScanNode { String headerType = getHeaderType(context.fileGroup.getFileFormat()); TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, - context.fileGroup.getColumnsFromPath()); + context.fileGroup.getColumnNamesFromPath()); int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 84d388fa34..0fd3db091f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -142,9 +142,12 @@ public class StreamLoadScanNode extends LoadScanNode { } } - Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, - exprsByName, analyzer, srcTupleDesc, slotDescByName, params, - taskInfo.getFormatType(), taskInfo.getHiddenColumns(), VectorizedUtil.isVectorized()); + if (params.getSrcSlotIds() == null) { + params.setSrcSlotIds(new java.util.ArrayList()); + } + Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, exprsByName, analyzer, srcTupleDesc, + slotDescByName, params.getSrcSlotIds(), taskInfo.getFormatType(), taskInfo.getHiddenColumns(), + VectorizedUtil.isVectorized()); // analyze where statement initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java new file mode 100644 index 0000000000..739b5f4596 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.UserProperty; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; + +public class BackendPolicy { + private static final Logger LOG = LogManager.getLogger(BackendPolicy.class); + private final List backends = Lists.newArrayList(); + + private int nextBe = 0; + + public void init() throws UserException { + Set tags = Sets.newHashSet(); + if (ConnectContext.get().getCurrentUserIdentity() != null) { + String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); + tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + if (tags == UserProperty.INVALID_RESOURCE_TAGS) { + throw new UserException("No valid resource tag for user: " + qualifiedUser); + } + } else { + LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); + } + + // scan node is used for query + BeSelectionPolicy policy = new BeSelectionPolicy.Builder() + .needQueryAvailable() + .needLoadAvailable() + .addTags(tags) + .build(); + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (policy.isMatch(be)) { + backends.add(be); + } + } + if (backends.isEmpty()) { + throw new UserException("No available backends"); + } + Random random = new Random(System.currentTimeMillis()); + Collections.shuffle(backends, random); + } + + public Backend getNextBe() { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + return selectedBackend; + } + + public int numBackends() { + return backends.size(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index ce1993a240..e9f2f51f3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -18,60 +18,47 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ArithmeticExpr; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.mysql.privilege.UserProperty; +import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.resource.Tag; +import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.system.Backend; -import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; -import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileScanSlotInfo; -import org.apache.doris.thrift.TFileTextScanRangeParams; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.THdfsParams; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; /** * ExternalFileScanNode for the file access type of catalog, now only support @@ -80,295 +67,301 @@ import java.util.Set; public class ExternalFileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class); - private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001"; + public static class ParamCreateContext { + public BrokerFileGroup fileGroup; + public List conjuncts; - private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; + public TupleDescriptor destTupleDescriptor; - // Just for explain - private int inputSplitsNum = 0; - private long totalFileSize = 0; - - private static class ParamCreateContext { - public TFileScanRangeParams params; + // === Set when init === public TupleDescriptor srcTupleDescriptor; + public Map exprMap; + public Map slotDescByName; + public String timezone; + // === Set when init === + + public TFileScanRangeParams params; + } - private static class BackendPolicy { - private final List backends = Lists.newArrayList(); - - private int nextBe = 0; - - public void init() throws UserException { - Set tags = Sets.newHashSet(); - if (ConnectContext.get().getCurrentUserIdentity() != null) { - String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); - tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - if (tags == UserProperty.INVALID_RESOURCE_TAGS) { - throw new UserException("No valid resource tag for user: " + qualifiedUser); - } - } else { - LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); - } - - // scan node is used for query - BeSelectionPolicy policy = new BeSelectionPolicy.Builder() - .needQueryAvailable() - .needLoadAvailable() - .addTags(tags) - .build(); - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { - if (policy.isMatch(be)) { - backends.add(be); - } - } - if (backends.isEmpty()) { - throw new UserException("No available backends"); - } - Random random = new Random(System.currentTimeMillis()); - Collections.shuffle(backends, random); - } - - public Backend getNextBe() { - Backend selectedBackend = backends.get(nextBe++); - nextBe = nextBe % backends.size(); - return selectedBackend; - } - - public int numBackends() { - return backends.size(); - } - } - - private static class FileSplitStrategy { - private long totalSplitSize; - private int splitNum; - - FileSplitStrategy() { - this.totalSplitSize = 0; - this.splitNum = 0; - } - - public void update(FileSplit split) { - totalSplitSize += split.getLength(); - splitNum++; - } - - public boolean hasNext() { - return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num; - } - - public void next() { - totalSplitSize = 0; - splitNum = 0; - } + public enum Type { + LOAD, QUERY } + private Type type = Type.QUERY; private final BackendPolicy backendPolicy = new BackendPolicy(); - private final ParamCreateContext context = new ParamCreateContext(); + // Only for load job. + // Save all info about load attributes and files. + // Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list. + private List fileGroupInfos = Lists.newArrayList(); + // For query, there is only one FileScanProvider in this list. + // For load, the num of providers equals to the num of file group infos. + private List scanProviders = Lists.newArrayList(); + // For query, there is only one ParamCreateContext in this list. + // For load, the num of ParamCreateContext equals to the num of file group infos. + private List contexts = Lists.newArrayList(); - private final List partitionKeys = new ArrayList<>(); + // Final output of this file scan node + private List scanRangeLocations = Lists.newArrayList(); - private List scanRangeLocations; - - private final HMSExternalTable hmsTable; - - private ExternalFileScanProvider scanProvider; + // For explain + private long inputSplitsNum = 0; + private long totalFileSize = 0; /** - * External file scan node for hms table. + * External file scan node for: + * 1. Query hms table + * 2. Load from file */ - public ExternalFileScanNode( - PlanNodeId id, - TupleDescriptor desc, - String planNodeName) { - + public ExternalFileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE); + } - this.hmsTable = (HMSExternalTable) this.desc.getTable(); - - switch (this.hmsTable.getDlaType()) { - case HUDI: - this.scanProvider = new ExternalHudiScanProvider(this.hmsTable); - break; - case ICEBERG: - this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable); - break; - case HIVE: - this.scanProvider = new ExternalHiveScanProvider(this.hmsTable); - break; - default: - LOG.warn("Unknown table for dla."); + // Only for load job. + public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, + List fileGroups, List> fileStatusesList, int filesAdded, + boolean strictMode, int loadParallelism, UserIdentity userIdentity) { + Preconditions.checkState(fileGroups.size() == fileStatusesList.size()); + for (int i = 0; i < fileGroups.size(); ++i) { + FileGroupInfo fileGroupInfo = new FileGroupInfo(loadJobId, txnId, targetTable, brokerDesc, + fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism, userIdentity); + fileGroupInfos.add(fileGroupInfo); } + this.type = Type.LOAD; } @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - if (hmsTable.isView()) { - throw new AnalysisException(String.format("Querying external view '[%s].%s.%s' is not supported", - hmsTable.getDlaType(), hmsTable.getDbName(), hmsTable.getName())); + + switch (type) { + case QUERY: + HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); + Preconditions.checkNotNull(hmsTable); + + if (hmsTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), + hmsTable.getDbName(), hmsTable.getName())); + } + + FileScanProviderIf scanProvider; + switch (hmsTable.getDlaType()) { + case HUDI: + scanProvider = new HudiScanProvider(hmsTable, desc); + break; + case ICEBERG: + scanProvider = new IcebergScanProvider(hmsTable, desc); + break; + case HIVE: + scanProvider = new HiveScanProvider(hmsTable, desc); + break; + default: + throw new UserException("Unknown table type: " + hmsTable.getDlaType()); + } + this.scanProviders.add(scanProvider); + break; + case LOAD: + for (FileGroupInfo fileGroupInfo : fileGroupInfos) { + this.scanProviders.add(new LoadScanProvider(fileGroupInfo)); + } + break; + default: + throw new UserException("Unknown type: " + type); } + backendPolicy.init(); numNodes = backendPolicy.numBackends(); - initContext(); + + initParamCreateContexts(analyzer); } - private void initContext() throws DdlException, MetaNotFoundException { - context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); - context.params = new TFileScanRangeParams(); - if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { - Map serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); - String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) - ? HIVE_DEFAULT_COLUMN_SEPARATOR - : serDeInfoParams.get("field.delim"); - String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) - ? HIVE_DEFAULT_LINE_DELIMITER - : serDeInfoParams.get("line.delim"); + // For each scan provider, create a corresponding ParamCreateContext + private void initParamCreateContexts(Analyzer analyzer) throws UserException { + for (FileScanProviderIf scanProvider : scanProviders) { + ParamCreateContext context = scanProvider.createContext(analyzer); + // set where and preceding filter. + // FIXME(cmy): we should support set different expr for different file group. + initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); + initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); + context.destTupleDescriptor = desc; + context.conjuncts = conjuncts; + this.contexts.add(context); + } + } - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setLineDelimiterStr(lineDelimiter); - textParams.setColumnSeparatorStr(columnSeparator); + private void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addPreFilterConjuncts(newWhereExpr.getConjuncts()); + } + } - context.params.setTextParams(textParams); + private void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addConjuncts(newWhereExpr.getConjuncts()); + } + } + + private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException { + if (whereExpr == null) { + return null; } - context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); - // Need re compute memory layout after set some slot descriptor to nullable - context.srcTupleDescriptor.computeStatAndMemLayout(); - - Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - - List columns = hmsTable.getBaseSchema(false); - for (Column column : columns) { - SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); - slotDesc.setType(column.getType()); - slotDesc.setIsMaterialized(true); - slotDesc.setIsNullable(true); - slotDesc.setColumn(new Column(column)); - slotDescByName.put(column.getName(), slotDesc); + Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) { + dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); } - // Hive table must extract partition value from path and hudi/iceberg table keep - // partition field in file. - partitionKeys.addAll(scanProvider.getPathPartitionKeys()); - context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.isMaterialized()) { - continue; + // substitute SlotRef in filter expression + // where expr must be equal first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate) + Expr newWhereExpr = analyzer.getExprRewriter() + .rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE); + List slots = Lists.newArrayList(); + newWhereExpr.collect(SlotRef.class, slots); + + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + for (SlotRef slot : slots) { + SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName()); + if (slotDesc == null) { + throw new UserException( + "unknown column reference in where statement, reference=" + slot.getColumnName()); } - int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); - - TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); - slotInfo.setSlotId(slotId); - slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); - - context.params.addToRequiredSlots(slotInfo); + smap.getLhs().add(slot); + smap.getRhs().add(new SlotRef(slotDesc)); } + newWhereExpr = newWhereExpr.clone(smap); + newWhereExpr.analyze(analyzer); + if (!newWhereExpr.getType().equals(org.apache.doris.catalog.Type.BOOLEAN)) { + throw new UserException("where statement is not a valid statement return bool"); + } + return newWhereExpr; } @Override public void finalize(Analyzer analyzer) throws UserException { - try { - buildScanRange(); - } catch (IOException e) { - LOG.warn("Finalize failed.", e); - throw new UserException("Finalize failed: " + e.getMessage()); + Preconditions.checkState(contexts.size() == scanProviders.size(), + contexts.size() + " vs. " + scanProviders.size()); + for (int i = 0; i < contexts.size(); ++i) { + ParamCreateContext context = contexts.get(i); + finalizeParamsForLoad(context, analyzer); + FileScanProviderIf scanProvider = scanProviders.get(i); + createScanRangeLocations(context, scanProvider); + this.inputSplitsNum += scanProvider.getInputSplitNum(); + this.totalFileSize += scanProvider.getInputFileSize(); } } - // If fileFormat is not null, we use fileFormat instead of check file's suffix - private void buildScanRange() throws UserException, IOException { - scanRangeLocations = Lists.newArrayList(); - List inputSplits = scanProvider.getSplits(conjuncts); - if (inputSplits.isEmpty()) { + protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyzer) throws UserException { + if (type != Type.LOAD) { return; } - inputSplitsNum = inputSplits.size(); + Map slotDescByName = context.slotDescByName; + Map exprMap = context.exprMap; + TupleDescriptor srcTupleDesc = context.srcTupleDescriptor; + boolean negative = context.fileGroup.isNegative(); - String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); - String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); - String fsName = fullPath.replace(filePath, ""); - context.params.setFileType(scanProvider.getTableFileType()); - context.params.setFormatType(scanProvider.getTableFormatType()); - // set hdfs params for hdfs file type. - if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); - tHdfsParams.setFsName(fsName); - context.params.setHdfsParams(tHdfsParams); - } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { - context.params.setProperties(hmsTable.getS3Properties()); + TFileScanRangeParams params = context.params; + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : desc.getSlots()) { + if (!destSlotDesc.isMaterialized()) { + continue; + } + Expr expr = null; + if (exprMap != null) { + expr = exprMap.get(destSlotDesc.getColumn().getName()); + } + if (expr == null) { + SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); + if (srcSlotDesc != null) { + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + // If dest is allow null, we set source to nullable + if (destSlotDesc.getColumn().isAllowNull()) { + srcSlotDesc.setIsNullable(true); + } + expr = new SlotRef(srcSlotDesc); + } else { + Column column = destSlotDesc.getColumn(); + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + } + } + + // check hll_hash + if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() + .getFunction().equalsIgnoreCase("hll_empty")) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=hll_empty()"); + } + expr.setType(org.apache.doris.catalog.Type.HLL); + } + + checkBitmapCompatibility(analyzer, destSlotDesc, expr); + checkQuantileStateCompatibility(analyzer, destSlotDesc, expr); + + if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { + expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); + expr.analyze(analyzer); + } + expr = castToSlot(destSlotDesc, expr); + params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); } + params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); + params.setDestTupleId(desc.getId().asInt()); + params.setSrcTupleId(srcTupleDesc.getId().asInt()); - TScanRangeLocations curLocations = newLocations(context.params); + // Need re compute memory layout after set some slot descriptor to nullable + srcTupleDesc.computeStatAndMemLayout(); + } - FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); - - for (InputSplit split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - totalFileSize += split.getLength(); - - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - partitionKeys, false); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.info("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " - + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" - + " loaction: " + Joiner.on("|").join(split.getLocations())); - - fileSplitStrategy.update(fileSplit); - // Add a new location when it's can be split - if (fileSplitStrategy.hasNext()) { - scanRangeLocations.add(curLocations); - curLocations = newLocations(context.params); - fileSplitStrategy.next(); + protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isBitmapType()) { + String errorMsg = String.format("bitmap column %s require the function return type is BITMAP", + slotDesc.getColumn().getName()); + throw new AnalysisException(errorMsg); } } - if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { - scanRangeLocations.add(curLocations); + } + + protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isQuantileStateType()) { + String errorMsg = "quantile_state column %s require the function return type is QUANTILE_STATE"; + throw new AnalysisException(errorMsg); + } } } - private TScanRangeLocations newLocations(TFileScanRangeParams params) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend = backendPolicy.getNextBe(); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - locations.addToLocations(location); - - return locations; - } - - private TFileRangeDesc createFileRangeDesc( - FileSplit fileSplit, - List columnsFromPath) throws DdlException, MetaNotFoundException { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setStartOffset(fileSplit.getStart()); - rangeDesc.setSize(fileSplit.getLength()); - rangeDesc.setColumnsFromPath(columnsFromPath); - - if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { - rangeDesc.setPath(fileSplit.getPath().toString()); - } - return rangeDesc; + private void createScanRangeLocations(ParamCreateContext context, FileScanProviderIf scanProvider) + throws UserException { + scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } @Override @@ -401,9 +394,8 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(prefix).append("table: ").append(hmsTable.getDbName()).append(".").append(hmsTable.getName()) - .append("\n").append(prefix).append("hms url: ").append(scanProvider.getMetaStoreUrl()).append("\n"); + StringBuilder output = new StringBuilder(prefix); + // output.append(fileTable.getExplainString(prefix)); if (!conjuncts.isEmpty()) { output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java deleted file mode 100644 index 2dd9acd000..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ /dev/null @@ -1,186 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.HiveBucketUtil; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.external.hive.util.HiveUtil; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * A HiveScanProvider to get information for scan node. - */ -public class ExternalHiveScanProvider implements ExternalFileScanProvider { - private static final Logger LOG = LogManager.getLogger(ExternalHiveScanProvider.class); - - protected HMSExternalTable hmsTable; - - public ExternalHiveScanProvider(HMSExternalTable hmsTable) { - this.hmsTable = hmsTable; - } - - @Override - public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { - TFileFormatType type = null; - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); - String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); - if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { - type = TFileFormatType.FORMAT_PARQUET; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { - type = TFileFormatType.FORMAT_ORC; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { - type = TFileFormatType.FORMAT_CSV_PLAIN; - } - return type; - } - - @Override - public TFileType getTableFileType() throws DdlException, MetaNotFoundException { - String location = hmsTable.getRemoteTable().getSd().getLocation(); - if (location != null && !location.isEmpty()) { - if (location.startsWith("s3a") || location.startsWith("s3n")) { - return TFileType.FILE_S3; - } else if (location.startsWith("hdfs:")) { - return TFileType.FILE_HDFS; - } - } - throw new DdlException("Unknown file type for hms table."); - } - - @Override - public String getMetaStoreUrl() { - return hmsTable.getMetastoreUri(); - } - - @Override - public List getSplits(List exprs) - throws IOException, UserException { - String splitsPath = getRemoteHiveTable().getSd().getLocation(); - List partitionKeys = getRemoteHiveTable().getPartitionKeys() - .stream().map(FieldSchema::getName).collect(Collectors.toList()); - List hivePartitions = new ArrayList<>(); - - if (partitionKeys.size() > 0) { - ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr( - exprs, partitionKeys, hmsTable.getName()); - - String metaStoreUris = getMetaStoreUrl(); - hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions( - metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate)); - } - - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); - - Configuration configuration = setConfiguration(); - InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); - List splits; - if (!hivePartitions.isEmpty()) { - try { - splits = hivePartitions.stream().flatMap(x -> { - try { - return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } catch (RuntimeException e) { - throw new IOException(e); - } - } else { - splits = getSplitsByPath(inputFormat, configuration, splitsPath); - } - return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, - getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), - getRemoteHiveTable().getParameters()); - } - - private List getSplitsByPath(InputFormat inputFormat, Configuration configuration, - String splitsPath) throws IOException { - JobConf jobConf = new JobConf(configuration); - // For Tez engine, it may generate subdirectoies for "union" query. - // So there may be files and directories in the table directory at the same time. eg: - // /user/hive/warehouse/region_tmp_union_all2/000000_0 - // /user/hive/warehouse/region_tmp_union_all2/1 - // /user/hive/warehouse/region_tmp_union_all2/2 - // So we need to set this config to support visit dir recursively. - // Otherwise, getSplits() may throw exception: "Not a file xxx" - // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); - FileInputFormat.setInputPaths(jobConf, splitsPath); - InputSplit[] splits = inputFormat.getSplits(jobConf, 0); - return Lists.newArrayList(splits); - } - - - protected Configuration setConfiguration() { - Configuration conf = new HdfsConfiguration(); - Map dfsProperties = hmsTable.getDfsProperties(); - for (Map.Entry entry : dfsProperties.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - Map s3Properties = hmsTable.getS3Properties(); - for (Map.Entry entry : s3Properties.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - return conf; - } - - @Override - public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { - return hmsTable.getRemoteTable(); - } - - @Override - public Map getTableProperties() throws MetaNotFoundException { - Map properteis = Maps.newHashMap(hmsTable.getRemoteTable().getParameters()); - properteis.putAll(hmsTable.getDfsProperties()); - return properteis; - } - - @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java new file mode 100644 index 0000000000..529f575a47 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * FileTable encapsulates a set of files to be scanned into a Table like structure, + * which has table attributes such as Schema, and file attributes such as row and column separators. + * Mainly used to unify HMSTableScan and FileScan + */ +public class FileGroupInfo { + private static final Logger LOG = LogManager.getLogger(FileGroupInfo.class); + + private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001"; + private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; + + private long loadJobId; + private long txnId; + private Table targetTable; + private BrokerDesc brokerDesc; + private BrokerFileGroup fileGroup; + private List fileStatuses; + private int filesAdded; + private boolean strictMode; + private int loadParallelism; + private UserIdentity userIdentity; + // set by getFileStatusAndCalcInstance + long bytesPerInstance = 0; + + public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, + BrokerFileGroup fileGroup, List fileStatuses, int filesAdded, boolean strictMode, + int loadParallelism, UserIdentity userIdentity) { + this.loadJobId = loadJobId; + this.txnId = txnId; + this.targetTable = targetTable; + this.brokerDesc = brokerDesc; + this.fileGroup = fileGroup; + this.fileStatuses = fileStatuses; + this.filesAdded = filesAdded; + this.strictMode = strictMode; + this.loadParallelism = loadParallelism; + this.userIdentity = userIdentity; + } + + public Table getTargetTable() { + return targetTable; + } + + public BrokerDesc getBrokerDesc() { + return brokerDesc; + } + + public BrokerFileGroup getFileGroup() { + return fileGroup; + } + + public List getFileStatuses() { + return fileStatuses; + } + + public boolean isStrictMode() { + return strictMode; + } + + public int getLoadParallelism() { + return loadParallelism; + } + + public UserIdentity getUserIdentity() { + return userIdentity; + } + + public String getExplainString(String prefix) { + StringBuilder sb = new StringBuilder(); + sb.append("file scan\n"); + return sb.toString(); + } + + public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy) throws UserException { + if (filesAdded == 0) { + throw new UserException("No source file in this table(" + targetTable.getName() + ")."); + } + + long totalBytes = 0; + for (TBrokerFileStatus fileStatus : fileStatuses) { + totalBytes += fileStatus.size; + } + int numInstances = (int) (totalBytes / Config.min_bytes_per_broker_scanner); + int totalLoadParallelism = loadParallelism * backendPolicy.numBackends(); + numInstances = Math.min(totalLoadParallelism, numInstances); + numInstances = Math.min(numInstances, Config.max_broker_concurrency); + numInstances = Math.max(1, numInstances); + + bytesPerInstance = totalBytes / numInstances + 1; + if (bytesPerInstance > Config.max_bytes_per_broker_scanner) { + throw new UserException("Scan bytes per file scanner exceed limit: " + Config.max_bytes_per_broker_scanner); + } + LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); + } + + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); + long curInstanceBytes = 0; + long curFileOffset = 0; + for (int i = 0; i < fileStatuses.size(); ) { + TBrokerFileStatus fileStatus = fileStatuses.get(i); + long leftBytes = fileStatus.size - curFileOffset; + long tmpBytes = curInstanceBytes + leftBytes; + // header_type + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + if (tmpBytes > bytesPerInstance) { + // Now only support split plain text + if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON) { + long rangeBytes = bytesPerInstance - curInstanceBytes; + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset += rangeBytes; + } else { + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset = 0; + i++; + } + + // New one scan + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, brokerDesc, backendPolicy); + curInstanceBytes = 0; + + } else { + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset = 0; + curInstanceBytes += leftBytes; + i++; + } + } + + // Put the last file + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().isSetRanges()) { + scanRangeLocations.add(curLocations); + } + } + + protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDesc brokerDesc, + BackendPolicy backendPolicy) throws UserException { + + Backend selectedBackend = backendPolicy.getNextBe(); + + // Generate one file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + + if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { + FsBroker broker = null; + try { + broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerDesc.getName(), selectedBackend.getHost()); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } else { + params.setBrokerAddresses(new ArrayList<>()); + } + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private String getHeaderType(String formatType) { + if (formatType != null) { + if (formatType.toLowerCase().equals(FeConstants.csv_with_names) || formatType.toLowerCase() + .equals(FeConstants.csv_with_names_and_types)) { + return formatType; + } + } + return ""; + } + + private TFileFormatType formatType(String fileFormat, String path) throws UserException { + if (fileFormat != null) { + if (fileFormat.toLowerCase().equals("parquet")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (fileFormat.toLowerCase().equals("orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (fileFormat.toLowerCase().equals("json")) { + return TFileFormatType.FORMAT_JSON; + // csv/csv_with_name/csv_with_names_and_types treat as csv format + } else if (fileFormat.toLowerCase().equals(FeConstants.csv) || fileFormat.toLowerCase() + .equals(FeConstants.csv_with_names) || fileFormat.toLowerCase() + .equals(FeConstants.csv_with_names_and_types) + // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. + || fileFormat.toLowerCase().equals(FeConstants.text)) { + return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); + } + } + + String lowerCasePath = path.toLowerCase(); + if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerCasePath.endsWith(".gz")) { + return TFileFormatType.FORMAT_CSV_GZ; + } else if (lowerCasePath.endsWith(".bz2")) { + return TFileFormatType.FORMAT_CSV_BZ2; + } else if (lowerCasePath.endsWith(".lz4")) { + return TFileFormatType.FORMAT_CSV_LZ4FRAME; + } else if (lowerCasePath.endsWith(".lzo")) { + return TFileFormatType.FORMAT_CSV_LZOP; + } else if (lowerCasePath.endsWith(".deflate")) { + return TFileFormatType.FORMAT_CSV_DEFLATE; + } else { + return TFileFormatType.FORMAT_CSV_PLAIN; + } + } + + private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, + List columnsFromPath) { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setPath(fileStatus.path); + rangeDesc.setStartOffset(curFileOffset); + rangeDesc.setSize(rangeBytes); + rangeDesc.setColumnsFromPath(columnsFromPath); + return rangeDesc; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java new file mode 100644 index 0000000000..700d8be098 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TScanRangeLocations; + +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public interface FileScanProviderIf { + // Return parquet/orc/text, etc. + TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException; + + // Return S3/HDSF, etc. + TFileType getLocationType() throws DdlException, MetaNotFoundException; + + // Return file list + List getSplits(List exprs) throws IOException, UserException; + + // return properties for S3/HDFS, etc. + Map getLocationProperties() throws MetaNotFoundException, DdlException; + + List getPathPartitionKeys() throws DdlException, MetaNotFoundException; + + ParamCreateContext createContext(Analyzer analyzer) throws UserException; + + void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException; + + int getInputSplitNum(); + + long getInputFileSize(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java new file mode 100644 index 0000000000..83a6d3d49c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.common.Config; + +import org.apache.hadoop.mapred.FileSplit; + +public class FileSplitStrategy { + private long totalSplitSize; + private int splitNum; + + FileSplitStrategy() { + this.totalSplitSize = 0; + this.splitNum = 0; + } + + public void update(FileSplit split) { + totalSplitSize += split.getLength(); + splitNum++; + } + + public boolean hasNext() { + return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num; + } + + public void next() { + totalSplitSize = 0; + splitNum = 0; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java index 36e11d8845..b602722a31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java @@ -17,35 +17,20 @@ package org.apache.doris.planner.external; -import org.apache.doris.analysis.Expr; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.mapred.InputSplit; -import java.io.IOException; -import java.util.List; import java.util.Map; /** - * An interface for file scan node to get the need information. + * An interface for hms table scan node to get the need information. */ -public interface ExternalFileScanProvider { - TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException; - - TFileType getTableFileType() throws DdlException, MetaNotFoundException; - +public interface HMSTableScanProviderIf extends FileScanProviderIf { String getMetaStoreUrl(); - List getSplits(List exprs) throws IOException, UserException; - Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; Map getTableProperties() throws MetaNotFoundException; - - List getPathPartitionKeys() throws DdlException, MetaNotFoundException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java new file mode 100644 index 0000000000..ac72f7a274 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveBucketUtil; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A HiveScanProvider to get information for scan node. + */ +public class HiveScanProvider implements HMSTableScanProviderIf { + private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class); + + protected HMSExternalTable hmsTable; + + protected int inputSplitNum = 0; + protected long inputFileSize = 0; + protected final TupleDescriptor desc; + + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + this.hmsTable = hmsTable; + this.desc = desc; + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + TFileFormatType type = null; + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); + if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { + type = TFileFormatType.FORMAT_ORC; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { + type = TFileFormatType.FORMAT_CSV_PLAIN; + } + return type; + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + String location = hmsTable.getRemoteTable().getSd().getLocation(); + if (location != null && !location.isEmpty()) { + if (location.startsWith("s3a") || location.startsWith("s3n")) { + return TFileType.FILE_S3; + } else if (location.startsWith("hdfs:")) { + return TFileType.FILE_HDFS; + } + } + throw new DdlException("Unknown file location for hms table."); + } + + @Override + public String getMetaStoreUrl() { + return hmsTable.getMetastoreUri(); + } + + @Override + public List getSplits(List exprs) throws IOException, UserException { + String splitsPath = getRemoteHiveTable().getSd().getLocation(); + List partitionKeys = getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toList()); + List hivePartitions = new ArrayList<>(); + + if (partitionKeys.size() > 0) { + ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr(exprs, + partitionKeys, hmsTable.getName()); + + String metaStoreUris = getMetaStoreUrl(); + hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, getRemoteHiveTable(), + hivePartitionPredicate)); + } + + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + + Configuration configuration = setConfiguration(); + InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + List splits; + if (!hivePartitions.isEmpty()) { + try { + splits = hivePartitions.stream().flatMap(x -> { + try { + return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } catch (RuntimeException e) { + throw new IOException(e); + } + } else { + splits = getSplitsByPath(inputFormat, configuration, splitsPath); + } + return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, + getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), + getRemoteHiveTable().getParameters()); + } + + private List getSplitsByPath(InputFormat inputFormat, Configuration configuration, + String splitsPath) throws IOException { + JobConf jobConf = new JobConf(configuration); + // For Tez engine, it may generate subdirectoies for "union" query. + // So there may be files and directories in the table directory at the same time. eg: + // /user/hive/warehouse/region_tmp_union_all2/000000_0 + // /user/hive/warehouse/region_tmp_union_all2/1 + // /user/hive/warehouse/region_tmp_union_all2/2 + // So we need to set this config to support visit dir recursively. + // Otherwise, getSplits() may throw exception: "Not a file xxx" + // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 + jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); + FileInputFormat.setInputPaths(jobConf, splitsPath); + InputSplit[] splits = inputFormat.getSplits(jobConf, 0); + return Lists.newArrayList(splits); + } + + + protected Configuration setConfiguration() { + Configuration conf = new HdfsConfiguration(); + Map dfsProperties = hmsTable.getDfsProperties(); + for (Map.Entry entry : dfsProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + Map s3Properties = hmsTable.getS3Properties(); + for (Map.Entry entry : s3Properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + @Override + public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { + return hmsTable.getRemoteTable(); + } + + @Override + public Map getTableProperties() throws MetaNotFoundException { + // TODO: implement it when we really properties from remote table. + return Maps.newHashMap(); + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + TFileType locationType = getLocationType(); + if (locationType == TFileType.FILE_S3) { + return hmsTable.getS3Properties(); + } else if (locationType == TFileType.FILE_HDFS) { + return hmsTable.getDfsProperties(); + } else { + return Maps.newHashMap(); + } + } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + } + + @Override + public ParamCreateContext createContext(Analyzer analyzer) throws UserException { + ParamCreateContext context = new ParamCreateContext(); + context.params = new TFileScanRangeParams(); + context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); + context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); + context.fileGroup = new BrokerFileGroup(hmsTable.getId(), + hmsTable.getRemoteTable().getSd().getLocation(), hmsTable.getRemoteTable().getSd().getInputFormat()); + Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + List columns = hmsTable.getBaseSchema(false); + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(column.getType()); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(column)); + slotDescByName.put(column.getName(), slotDesc); + } + // Need re compute memory layout after set some slot descriptor to nullable + context.srcTupleDescriptor.computeStatAndMemLayout(); + + // Hive table must extract partition value from path and hudi/iceberg table keep + // partition field in file. + List partitionKeys = getPathPartitionKeys(); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slotId); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + context.params.addToRequiredSlots(slotInfo); + } + return context; + } + + @Override + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + try { + List inputSplits = getSplits(context.conjuncts); + this.inputSplitNum = inputSplits.size(); + if (inputSplits.isEmpty()) { + return; + } + + String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + TFileType locationType = getLocationType(); + context.params.setFileType(locationType); + context.params.setFormatType(getFileFormatType()); + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (locationType == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + context.params.setHdfsParams(tHdfsParams); + } else if (locationType == TFileType.FILE_S3) { + context.params.setProperties(locationProperties); + } + + TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); + + FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); + + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + getPathPartitionKeys(), false); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + LOG.info( + "Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" + + " loaction: " + Joiner.on("|").join(split.getLocations())); + + fileSplitStrategy.update(fileSplit); + // Add a new location when it's can be split + if (fileSplitStrategy.hasNext()) { + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, backendPolicy); + fileSplitStrategy.next(); + } + this.inputFileSize += fileSplit.getLength(); + } + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { + scanRangeLocations.add(curLocations); + } + } catch (IOException e) { + throw new UserException(e); + } + } + + @Override + public int getInputSplitNum() { + return this.inputSplitNum; + } + + @Override + public long getInputFileSize() { + return this.inputFileSize; + } + + private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend selectedBackend = backendPolicy.getNextBe(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath) + throws DdlException, MetaNotFoundException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setColumnsFromPath(columnsFromPath); + + if (getLocationType() == TFileType.FILE_HDFS) { + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + } else if (getLocationType() == TFileType.FILE_S3) { + rangeDesc.setPath(fileSplit.getPath().toString()); + } + return rangeDesc; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java similarity index 83% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java index c32a2ac3d7..d4c95b2549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java @@ -17,6 +17,7 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -29,14 +30,14 @@ import java.util.List; * A file scan provider for hudi. * HudiProvier is extended with hive since they both use input format interface to get the split. */ -public class ExternalHudiScanProvider extends ExternalHiveScanProvider { +public class HudiScanProvider extends HiveScanProvider { - public ExternalHudiScanProvider(HMSExternalTable hmsTable) { - super(hmsTable); + public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + super(hmsTable, desc); } @Override - public TFileFormatType getTableFormatType() throws DdlException { + public TFileFormatType getFileFormatType() throws DdlException { return TFileFormatType.FORMAT_PARQUET; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index 10fd85b242..f82e277f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -45,14 +46,14 @@ import java.util.Map; /** * A file scan provider for iceberg. */ -public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { +public class IcebergScanProvider extends HiveScanProvider { - public ExternalIcebergScanProvider(HMSExternalTable hmsTable) { - super(hmsTable); + public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + super(hmsTable, desc); } @Override - public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { TFileFormatType type; String icebergFormat = getRemoteHiveTable().getParameters() diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java new file mode 100644 index 0000000000..8843ff4462 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.task.LoadTaskInfo; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class LoadScanProvider implements FileScanProviderIf { + + FileGroupInfo fileGroupInfo; + + public LoadScanProvider(FileGroupInfo fileGroupInfo) { + this.fileGroupInfo = fileGroupInfo; + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public List getSplits(List exprs) throws IOException, UserException { + return null; + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + return null; + } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public ParamCreateContext createContext(Analyzer analyzer) throws UserException { + ParamCreateContext ctx = new ParamCreateContext(); + ctx.fileGroup = fileGroupInfo.getFileGroup(); + ctx.timezone = analyzer.getTimezone(); + + TFileScanRangeParams params = new TFileScanRangeParams(); + params.setStrictMode(fileGroupInfo.isStrictMode()); + params.setProperties(fileGroupInfo.getBrokerDesc().getProperties()); + if (fileGroupInfo.getBrokerDesc().getFileType() == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(fileGroupInfo.getBrokerDesc().getProperties()); + params.setHdfsParams(tHdfsParams); + } + TFileAttributes fileAttributes = new TFileAttributes(); + setFileAttributes(ctx.fileGroup, fileAttributes); + params.setFileAttributes(fileAttributes); + ctx.params = params; + + initColumns(ctx, analyzer); + return ctx; + } + + public void setFileAttributes(BrokerFileGroup fileGroup, TFileAttributes fileAttributes) { + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setColumnSeparator(fileGroup.getColumnSeparator()); + textParams.setLineDelimiter(fileGroup.getLineDelimiter()); + fileAttributes.setTextParams(textParams); + fileAttributes.setStripOuterArray(fileGroup.isStripOuterArray()); + fileAttributes.setJsonpaths(fileGroup.getJsonPaths()); + fileAttributes.setJsonRoot(fileGroup.getJsonRoot()); + fileAttributes.setNumAsString(fileGroup.isNumAsString()); + fileAttributes.setFuzzyParse(fileGroup.isFuzzyParse()); + fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine()); + fileAttributes.setReadByColumnDef(true); + fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat())); + } + + private String getHeaderType(String formatType) { + if (formatType != null) { + if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || formatType.equalsIgnoreCase( + FeConstants.csv_with_names_and_types)) { + return formatType; + } + } + return ""; + } + + @Override + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + Preconditions.checkNotNull(fileGroupInfo); + fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy); + fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations); + } + + @Override + public int getInputSplitNum() { + return fileGroupInfo.getFileStatuses().size(); + } + + @Override + public long getInputFileSize() { + long res = 0; + for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) { + res = fileStatus.getSize(); + } + return res; + } + + /** + * This method is used to calculate the slotDescByName and exprMap. + * The expr in exprMap is analyzed in this function. + * The smap of slot which belongs to expr will be analyzed by src desc. + * slotDescByName: the single slot from columns in load stmt + * exprMap: the expr from column mapping in load stmt. + * + * @param context + * @throws UserException + */ + private void initColumns(ParamCreateContext context, Analyzer analyzer) throws UserException { + context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); + context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + // for load job, column exprs is got from file group + // for query, there is no column exprs, they will be got from table's schema in "Load.initColumns" + LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs(); + // fileGroup.getColumnExprList() contains columns from path + columnDescs.descs = context.fileGroup.getColumnExprList(); + if (context.fileGroup.getMergeType() == LoadTask.MergeType.MERGE) { + columnDescs.descs.add( + ImportColumnDesc.newDeleteSignImportColumnDesc(context.fileGroup.getDeleteCondition())); + } else if (context.fileGroup.getMergeType() == LoadTask.MergeType.DELETE) { + columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); + } + // add columnExpr for sequence column + if (context.fileGroup.hasSequenceCol()) { + columnDescs.descs.add( + new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, context.fileGroup.getSequenceCol()))); + } + List srcSlotIds = Lists.newArrayList(); + Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), + context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, srcSlotIds, + formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); + + int numColumnsFromFile = srcSlotIds.size() - context.fileGroup.getColumnNamesFromPath().size(); + Preconditions.checkState(numColumnsFromFile >= 0, + "srcSlotIds.size is: " + srcSlotIds.size() + ", num columns from path: " + + context.fileGroup.getColumnNamesFromPath().size()); + context.params.setNumOfColumnsFromFile(numColumnsFromFile); + for (int i = 0; i < srcSlotIds.size(); ++i) { + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(srcSlotIds.get(i)); + slotInfo.setIsFileSlot(i < numColumnsFromFile); + context.params.addToRequiredSlots(slotInfo); + } + } + + private TFileFormatType formatType(String fileFormat, String path) throws UserException { + if (fileFormat != null) { + String lowerFileFormat = fileFormat.toLowerCase(); + if (lowerFileFormat.equals("parquet")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerFileFormat.equals("orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (lowerFileFormat.equals("json")) { + return TFileFormatType.FORMAT_JSON; + // csv/csv_with_name/csv_with_names_and_types treat as csv format + } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) + || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) + // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. + || lowerFileFormat.equals(FeConstants.text)) { + return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); + } + } + + String lowerCasePath = path.toLowerCase(); + if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerCasePath.endsWith(".gz")) { + return TFileFormatType.FORMAT_CSV_GZ; + } else if (lowerCasePath.endsWith(".bz2")) { + return TFileFormatType.FORMAT_CSV_BZ2; + } else if (lowerCasePath.endsWith(".lz4")) { + return TFileFormatType.FORMAT_CSV_LZ4FRAME; + } else if (lowerCasePath.endsWith(".lzo")) { + return TFileFormatType.FORMAT_CSV_LZOP; + } else if (lowerCasePath.endsWith(".deflate")) { + return TFileFormatType.FORMAT_CSV_DEFLATE; + } else { + return TFileFormatType.FORMAT_CSV_PLAIN; + } + } +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 13c8452293..584054f611 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -217,8 +217,8 @@ struct TEsScanRange { } struct TFileTextScanRangeParams { - 1: optional string column_separator_str; - 2: optional string line_delimiter_str; + 1: optional string column_separator; + 2: optional string line_delimiter; } struct TFileScanSlotInfo { @@ -226,20 +226,51 @@ struct TFileScanSlotInfo { 2: optional bool is_file_slot; } -struct TFileScanRangeParams { - 1: optional Types.TFileType file_type; - 2: optional TFileFormatType format_type; - // use src_tuple_id to get all slots from src table include both file slot and partition slot. - 3: optional Types.TTupleId src_tuple_id; - // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot - 4: optional i32 num_of_columns_from_file; - // all selected slots which may compose from file and partiton value. - 5: optional list required_slots; +// descirbe how to read file +struct TFileAttributes { + 1: optional TFileTextScanRangeParams text_params; + // it's usefull when format_type == FORMAT_JSON + 2: optional bool strip_outer_array; + 3: optional string jsonpaths; + 4: optional string json_root; + 5: optional bool num_as_string; + 6: optional bool fuzzy_parse; + 7: optional bool read_json_by_line; + // Whether read line by column defination, only for Hive + 8: optional bool read_by_column_def; + // csv with header type + 9: optional string header_type; +} - 6: optional THdfsParams hdfs_params; - 7: optional TFileTextScanRangeParams text_params; - // properties for file such as s3 information - 8: optional map properties; +struct TFileScanRangeParams { + 1: optional Types.TFileType file_type; + 2: optional TFileFormatType format_type; + // If this is for load job, src point to the source table and dest point to the doris table. + // If this is for query, only dest_tuple_id is set, including both file slot and partition slot. + 3: optional Types.TTupleId src_tuple_id; + 4: optional Types.TTupleId dest_tuple_id + // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot + 5: optional i32 num_of_columns_from_file; + // all selected slots which may compose from file and partition value. + 6: optional list required_slots; + + 7: optional THdfsParams hdfs_params; + // properties for file such as s3 information + 8: optional map properties; + + // The convert exprt map for load job + // desc slot id -> expr + 9: optional map expr_of_dest_slot + // This is the mapping of dest slot id and src slot id in load expr + // It excludes the slot id which has the transform expr + 10: optional map dest_sid_to_src_sid_without_trans + + // strictMode is a boolean + // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded + 11: optional bool strict_mode + + 12: list broker_addresses + 13: TFileAttributes file_attributes } struct TFileRangeDesc { @@ -253,7 +284,9 @@ struct TFileRangeDesc { 4: optional list columns_from_path; } -// HDFS file scan range +// TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. +// TFileScanRangeParams: describe how to read and convert file +// list: file location and range struct TFileScanRange { 1: optional list ranges 2: optional TFileScanRangeParams params