From dc80a993bcd3ed7f102f5ba7f7ddf2416538d824 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Tue, 13 Sep 2022 13:36:34 +0800 Subject: [PATCH] [feature-wip](new-scan) New load scanner. (#12275) Related pr: https://github.com/apache/doris/pull/11582 https://github.com/apache/doris/pull/12048 Using new file scan node and new scheduling framework to do the load job, replace the old broker scan node. The load part (Be part) is work in progress. Query part (Fe) has been tested using tpch benchmark. Please review only the FE code in this pr, BE code has been disabled by enable_new_load_scan_node configuration. Will send another pr soon to fix be side code. --- be/src/common/config.h | 3 + be/src/exec/base_scanner.cpp | 8 +- be/src/vec/exec/file_scan_node.cpp | 4 +- be/src/vec/exec/file_text_scanner.cpp | 12 +- .../vec/exec/scan/new_file_arrow_scanner.cpp | 62 +- be/src/vec/exec/scan/new_file_arrow_scanner.h | 9 +- be/src/vec/exec/scan/new_file_scan_node.cpp | 13 +- be/src/vec/exec/scan/new_file_scan_node.h | 3 + be/src/vec/exec/scan/new_file_scanner.cpp | 159 +++++- be/src/vec/exec/scan/new_file_scanner.h | 30 +- .../vec/exec/scan/new_file_text_scanner.cpp | 20 +- be/src/vec/exec/scan/new_file_text_scanner.h | 3 +- be/src/vec/exec/scan/vscanner.cpp | 1 + be/src/vec/exec/scan/vscanner.h | 5 +- .../java/org/apache/doris/common/Config.java | 8 +- .../apache/doris/load/BrokerFileGroup.java | 49 +- .../main/java/org/apache/doris/load/Load.java | 25 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 13 +- .../load/loadv2/SparkLoadPendingTask.java | 6 +- .../apache/doris/planner/BrokerScanNode.java | 13 +- .../doris/planner/StreamLoadScanNode.java | 9 +- .../doris/planner/external/BackendPolicy.java | 83 +++ .../external/ExternalFileScanNode.java | 530 +++++++++--------- .../external/ExternalHiveScanProvider.java | 186 ------ .../doris/planner/external/FileGroupInfo.java | 292 ++++++++++ .../planner/external/FileScanProviderIf.java | 59 ++ .../planner/external/FileSplitStrategy.java | 46 ++ ...vider.java => HMSTableScanProviderIf.java} | 19 +- .../planner/external/HiveScanProvider.java | 370 ++++++++++++ ...canProvider.java => HudiScanProvider.java} | 9 +- ...Provider.java => IcebergScanProvider.java} | 9 +- .../planner/external/LoadScanProvider.java | 244 ++++++++ gensrc/thrift/PlanNodes.thrift | 65 ++- 33 files changed, 1774 insertions(+), 593 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java rename fe/fe-core/src/main/java/org/apache/doris/planner/external/{ExternalFileScanProvider.java => HMSTableScanProviderIf.java} (63%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java rename fe/fe-core/src/main/java/org/apache/doris/planner/external/{ExternalHudiScanProvider.java => HudiScanProvider.java} (83%) rename fe/fe-core/src/main/java/org/apache/doris/planner/external/{ExternalIcebergScanProvider.java => IcebergScanProvider.java} (93%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java diff --git a/be/src/common/config.h b/be/src/common/config.h index cf24e4de8b..eee4be2d30 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -866,6 +866,9 @@ CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); // so as to avoid occupying the execution thread for a long time. CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); +// Temp config. True to use new file scan node to do load job. Will remove after fully test. +CONF_Bool(enable_new_load_scan_node, "false"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index b6e64e57b9..c7c88771c6 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -170,13 +170,13 @@ Status BaseScanner::init_expr_ctxes() { _dest_expr_ctx.emplace_back(ctx); } if (has_slot_id_map) { - auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { + auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { _src_slot_descs_order_by_dest.emplace_back(nullptr); } else { - auto _src_slot_it = src_slot_desc_map.find(it->second); + auto _src_slot_it = src_slot_desc_map.find(it1->second); if (_src_slot_it == std::end(src_slot_desc_map)) { - return Status::InternalError("No src slot {} in src slot descs", it->second); + return Status::InternalError("No src slot {} in src slot descs", it1->second); } _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); } diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 0679154409..308db1204e 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -47,9 +47,7 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _scan_finished(false), _max_buffered_batches(32), _wait_scanner_timer(nullptr), - _runtime_filter_descs(tnode.runtime_filters) { - LOG(WARNING) << "file scan node runtime filter size=" << _runtime_filter_descs.size(); -} + _runtime_filter_descs(tnode.runtime_filters) {} Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScanNode::init(tnode, state)); diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index 02da0bca2c..8b40b52d40 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -49,14 +49,14 @@ FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile, { _init_profiles(profile); - if (params.__isset.text_params) { - auto text_params = params.text_params; - if (text_params.__isset.column_separator_str) { - _value_separator = text_params.column_separator_str; + if (params.file_attributes.__isset.text_params) { + auto text_params = params.file_attributes.text_params; + if (text_params.__isset.column_separator) { + _value_separator = text_params.column_separator; _value_separator_length = _value_separator.length(); } - if (text_params.__isset.line_delimiter_str) { - _line_delimiter = text_params.line_delimiter_str; + if (text_params.__isset.line_delimiter) { + _line_delimiter = text_params.line_delimiter; _line_delimiter_length = _line_delimiter.length(); } } diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp index acc98194d7..b204cd4174 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp @@ -21,14 +21,16 @@ #include "exec/arrow/parquet_reader.h" #include "io/file_factory.h" #include "vec/exec/scan/vscan_node.h" +#include "vec/functions/simple_function_factory.h" #include "vec/utils/arrow_column_to_doris_column.h" namespace doris::vectorized { NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_file_eof(false), _batch(nullptr), @@ -57,7 +59,9 @@ Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, b } *eof = false; - RETURN_IF_ERROR(init_block(block)); + if (!_is_load) { + RETURN_IF_ERROR(init_block(block)); + } // convert arrow batch to block until reach the batch_size while (!_scanner_eof) { // cast arrow type to PT0 and append it to block @@ -122,6 +126,46 @@ Status NewFileArrowScanner::_append_batch_to_block(Block* block) { return _fill_columns_from_path(block, num_elements); } +Status NewFileArrowScanner::_convert_to_output_block(Block* output_block) { + if (!config::enable_new_load_scan_node) { + return Status::OK(); + } + if (_input_block_ptr == output_block) { + return Status::OK(); + } + RETURN_IF_ERROR(_cast_src_block(_input_block_ptr)); + if (LIKELY(_input_block_ptr->rows() > 0)) { + RETURN_IF_ERROR(_materialize_dest_block(output_block)); + } + + return Status::OK(); +} + +// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> +// primitive type(PT1) ==materialize_block==> dest primitive type +Status NewFileArrowScanner::_cast_src_block(Block* block) { + // cast primitive type(PT0) to primitive type(PT1) + for (size_t i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _required_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto& arg = block->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + ColumnsWithTypeAndName arguments { + arg, + {DataTypeString().create_column_const( + arg.column->size(), remove_nullable(return_type)->get_family_name()), + std::make_shared(), ""}}; + auto func_cast = + SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); + RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); + block->get_by_position(i).type = std::move(return_type); + } + return Status::OK(); +} + Status NewFileArrowScanner::_next_arrow_batch() { _arrow_batch_cur_idx = 0; // first, init file reader @@ -195,8 +239,10 @@ Status NewFileArrowScanner::_open_next_reader() { NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) { + MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, + pre_filter_texprs) { // _init_profiles(profile); } @@ -211,8 +257,10 @@ ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* file_reade NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) {} + RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, + pre_filter_texprs) {} ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h index 3a85ce65c5..d1447fbf0c 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.h +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h @@ -29,7 +29,7 @@ class NewFileArrowScanner : public NewFileScanner { public: NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; protected: @@ -37,12 +37,15 @@ protected: virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) = 0; + // Convert input block to output block, if needed. + Status _convert_to_output_block(Block* output_block); private: Status _open_next_reader(); Status _next_arrow_batch(); Status _init_arrow_batch_if_necessary(); Status _append_batch_to_block(Block* block); + Status _cast_src_block(Block* block); private: // Reader @@ -56,7 +59,7 @@ class NewFileParquetScanner final : public NewFileArrowScanner { public: NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); ~NewFileParquetScanner() override = default; @@ -72,7 +75,7 @@ class NewFileORCScanner final : public NewFileArrowScanner { public: NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); ~NewFileORCScanner() override = default; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index c978ef0867..53693ac5b7 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -27,7 +27,9 @@ namespace doris::vectorized { NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : VScanNode(pool, tnode, descs), _file_scan_node(tnode.file_scan_node) { + : VScanNode(pool, tnode, descs), + _pre_filter_texprs(tnode.file_scan_node.pre_filter_exprs), + _file_scan_node(tnode.file_scan_node) { _output_tuple_id = tnode.file_scan_node.tuple_id; } @@ -95,16 +97,19 @@ VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; case TFileFormatType::FORMAT_ORC: scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; default: scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + _scanner_mem_tracker.get(), runtime_profile(), + _pre_filter_texprs); break; } _scanner_pool.add(scanner); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index c523bbb737..1cca33ac37 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -34,6 +34,9 @@ protected: Status _process_conjuncts() override; Status _init_scanners(std::list* scanners) override; +protected: + std::vector _pre_filter_texprs; + private: VScanner* _create_scanner(const TFileScanRange& scan_range); diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp index 4786caeefc..f037595953 100644 --- a/be/src/vec/exec/scan/new_file_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_scanner.cpp @@ -36,12 +36,15 @@ namespace doris::vectorized { NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) + RuntimeProfile* profile, const std::vector& pre_filter_texprs) : VScanner(state, static_cast(parent), limit, tracker), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), - _profile(profile) {} + _mem_pool(std::make_unique()), + _profile(profile), + _pre_filter_texprs(pre_filter_texprs), + _strict_mode(false) {} Status NewFileScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); @@ -99,6 +102,9 @@ Status NewFileScanner::_init_expr_ctxes() { } } + _src_tuple = (doris::Tuple*)_mem_pool->allocate(src_tuple_desc->byte_size()); + _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*)); + _src_tuple_row->set_tuple(0, _src_tuple); _row_desc.reset(new RowDescriptor(_state->desc_tbl(), std::vector({_params.src_tuple_id}), std::vector({false}))); @@ -114,6 +120,47 @@ Status NewFileScanner::_init_expr_ctxes() { RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); } + // Construct dest slots information + if (config::enable_new_load_scan_node) { + _dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id); + if (_dest_tuple_desc == nullptr) { + return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", + _params.dest_tuple_id); + } + + bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + auto it = _params.expr_of_dest_slot.find(slot_desc->id()); + if (it == std::end(_params.expr_of_dest_slot)) { + return Status::InternalError("No expr for dest slot, id={}, name={}", + slot_desc->id(), slot_desc->col_name()); + } + + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get())); + RETURN_IF_ERROR(ctx->open(_state)); + _dest_vexpr_ctx.emplace_back(ctx); + if (has_slot_id_map) { + auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { + _src_slot_descs_order_by_dest.emplace_back(nullptr); + } else { + auto _src_slot_it = _full_src_slot_map.find(it1->second); + if (_src_slot_it == std::end(_full_src_slot_map)) { + return Status::InternalError("No src slot {} in src slot descs", + it1->second); + } + _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); + } + } + } + } + return Status::OK(); } @@ -164,4 +211,112 @@ Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t return Status::OK(); } +Status NewFileScanner::_filter_input_block(Block* block) { + if (!config::enable_new_load_scan_node) { + return Status::OK(); + } + if (_is_load) { + auto origin_column_num = block->columns(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, block, + origin_column_num)); + } + return Status::OK(); +} + +Status NewFileScanner::_materialize_dest_block(vectorized::Block* dest_block) { + // Do vectorized expr here + int ctx_idx = 0; + size_t rows = _input_block.rows(); + auto filter_column = vectorized::ColumnUInt8::create(rows, 1); + auto& filter_map = filter_column->get_data(); + auto origin_column_num = _input_block.columns(); + + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + + auto* ctx = _dest_vexpr_ctx[dest_index]; + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(_input_block_ptr, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + auto column_ptr = + is_origin_column && _src_block_mem_reuse + ? _input_block.get_by_position(result_column_id).column->clone_resized(rows) + : _input_block.get_by_position(result_column_id).column; + + DCHECK(column_ptr != nullptr); + + // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr + // is likely to be nullable + if (LIKELY(column_ptr->is_nullable())) { + auto nullable_column = + reinterpret_cast(column_ptr.get()); + for (int i = 0; i < rows; ++i) { + if (filter_map[i] && nullable_column->is_null_at(i)) { + if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && + !_input_block.get_by_position(dest_index).column->is_null_at(i)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _input_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + auto raw_value = _input_block.get_by_position(ctx_idx) + .column->get_data_at(i); + std::string raw_string = raw_value.to_string(); + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) value is incorrect while strict " + "mode is {}, " + "src value is {}", + slot_desc->col_name(), _strict_mode, raw_string); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } else if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _input_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) values is null while columns is not " + "nullable", + slot_desc->col_name()); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } + } + } + if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr(); + } else if (slot_desc->is_nullable()) { + column_ptr = vectorized::make_nullable(column_ptr); + } + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); + } + + // after do the dest block insert operation, clear _src_block to remove the reference of origin column + if (_src_block_mem_reuse) { + _input_block.clear_column_data(origin_column_num); + } else { + _input_block.clear(); + } + + size_t dest_size = dest_block->columns(); + // do filter + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(filter_column), std::make_shared(), + "filter column")); + RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size)); + + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h index 8ef31df706..9beae0ebe0 100644 --- a/be/src/vec/exec/scan/new_file_scanner.h +++ b/be/src/vec/exec/scan/new_file_scanner.h @@ -20,6 +20,7 @@ #include "exec/text_converter.h" #include "exprs/bloomfilter_predicate.h" #include "exprs/function_filter.h" +#include "runtime/tuple.h" #include "vec/exec/scan/vscanner.h" namespace doris::vectorized { @@ -29,12 +30,18 @@ class NewFileScanNode; class NewFileScanner : public VScanner { public: NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile); + const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile, + const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; Status prepare(VExprContext** vconjunct_ctx_ptr); +protected: + // Use prefilters to filter input block + Status _filter_input_block(Block* block); + Status _materialize_dest_block(vectorized::Block* output_block); + protected: virtual void _init_profiles(RuntimeProfile* profile) = 0; @@ -59,6 +66,14 @@ protected: // Partition slot id to index map std::map _partition_slot_index_map; std::unique_ptr _row_desc; + doris::Tuple* _src_tuple; + TupleRow* _src_tuple_row; + + // Mem pool used to allocate _src_tuple and _src_tuple_row + std::unique_ptr _mem_pool; + + // Dest tuple descriptor and dest expr context + const TupleDescriptor* _dest_tuple_desc; // Profile RuntimeProfile* _profile; @@ -69,6 +84,19 @@ protected: int _rows = 0; int _num_of_columns_from_file; + const std::vector _pre_filter_texprs; + + std::vector _dest_vexpr_ctx; + // to filter src tuple directly. + std::unique_ptr _vpre_filter_ctx_ptr; + + // the map values of dest slot id to src slot desc + // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr + std::vector _src_slot_descs_order_by_dest; + + bool _src_block_mem_reuse = false; + bool _strict_mode; + private: Status _init_expr_ctxes(); }; diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp index da7205e8d1..a1e2cc9b56 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_text_scanner.cpp @@ -26,8 +26,9 @@ namespace doris::vectorized { NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile), + RuntimeProfile* profile, + const std::vector& pre_filter_texprs) + : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_line_reader_eof(false), @@ -45,8 +46,9 @@ Status NewFileTextScanner::open(RuntimeState* state) { Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { SCOPED_TIMER(_read_timer); - RETURN_IF_ERROR(init_block(block)); - + if (!_is_load) { + RETURN_IF_ERROR(init_block(block)); + } const int batch_size = state->batch_size(); *eof = false; int current_rows = _rows; @@ -248,4 +250,14 @@ Status NewFileTextScanner::_split_line(const Slice& line) { } return Status::OK(); } + +Status NewFileTextScanner::_convert_to_output_block(Block* output_block) { + if (_input_block_ptr == output_block) { + return Status::OK(); + } + if (LIKELY(_input_block_ptr->rows() > 0)) { + RETURN_IF_ERROR(_materialize_dest_block(output_block)); + } + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h index bfc112c870..ab12389450 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.h +++ b/be/src/vec/exec/scan/new_file_text_scanner.h @@ -30,13 +30,14 @@ class NewFileTextScanner : public NewFileScanner { public: NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile); + RuntimeProfile* profile, const std::vector& pre_filter_texprs); Status open(RuntimeState* state) override; protected: void _init_profiles(RuntimeProfile* profile) override {} Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + Status _convert_to_output_block(Block* output_block); private: Status _fill_file_columns(const Slice& line, vectorized::Block* _block); diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 40de8af22c..4089e5d2b2 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -30,6 +30,7 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTra _output_tuple_desc(parent->output_tuple_desc()) { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; _total_rf_num = _parent->runtime_filter_num(); + _is_load = (_input_tuple_desc != nullptr); } Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 3e65392080..4b356b15ed 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -63,9 +63,6 @@ protected: // Filter the output block finally. Status _filter_output_block(Block* block); - // to filter src tuple directly. - std::unique_ptr _vpre_filter_ctx_ptr; - public: VScanNode* get_parent() { return _parent; } @@ -163,7 +160,7 @@ protected: // File formats based push down predicate std::vector _conjunct_ctxs; - const std::vector _pre_filter_texprs; + bool _is_load = false; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 4dcc64f398..22cef54849 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1695,7 +1695,7 @@ public class Config extends ConfigBase { * Should be removed when this feature is ready. */ @ConfField(mutable = false, masterOnly = true) - public static boolean enable_multi_catalog = false; // 1 min + public static boolean enable_multi_catalog = false; @ConfField(mutable = true, masterOnly = false) public static long file_scan_node_split_size = 256 * 1024 * 1024; // 256mb @@ -1761,4 +1761,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec + + /** + * Temp config, should be removed when new file scan node is ready. + */ + @ConfField(mutable = true) + public static boolean enable_new_load_scan_node = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index c379b268a2..673408811f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -62,7 +62,7 @@ public class BrokerFileGroup implements Writable { private static final Logger LOG = LogManager.getLogger(BrokerFileGroup.class); private long tableId; - private String valueSeparator; + private String columnSeparator; private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix private String fileFormat; @@ -75,7 +75,7 @@ public class BrokerFileGroup implements Writable { private List fileFieldNames; // partition columnNames - private List columnsFromPath; + private List columnNamesFromPath; // columnExprList includes all fileFieldNames, columnsFromPath and column mappings // this param will be recreated by data desc when the log replay private List columnExprList; @@ -111,7 +111,7 @@ public class BrokerFileGroup implements Writable { // Used for broker table, no need to parse public BrokerFileGroup(BrokerTable table) throws AnalysisException { this.tableId = table.getId(); - this.valueSeparator = Separator.convertSeparator(table.getColumnSeparator()); + this.columnSeparator = Separator.convertSeparator(table.getColumnSeparator()); this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter()); this.isNegative = false; this.filePaths = table.getPaths(); @@ -130,26 +130,22 @@ public class BrokerFileGroup implements Writable { /** * Should used for hive/iceberg/hudi external table. */ - public BrokerFileGroup(long tableId, - String columnSeparator, - String lineDelimiter, - String filePath, - String fileFormat, - List columnsFromPath, - List columnExprList) throws AnalysisException { + public BrokerFileGroup(long tableId, String columnSeparator, String lineDelimiter, String filePath, + String fileFormat, List columnNamesFromPath, List columnExprList) + throws AnalysisException { this.tableId = tableId; - this.valueSeparator = Separator.convertSeparator(columnSeparator); + this.columnSeparator = Separator.convertSeparator(columnSeparator); this.lineDelimiter = Separator.convertSeparator(lineDelimiter); this.isNegative = false; this.filePaths = Lists.newArrayList(filePath); this.fileFormat = fileFormat; - this.columnsFromPath = columnsFromPath; + this.columnNamesFromPath = columnNamesFromPath; this.columnExprList = columnExprList; } public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); - this.columnsFromPath = dataDescription.getColumnsFromPath(); + this.columnNamesFromPath = dataDescription.getColumnsFromPath(); this.columnExprList = dataDescription.getParsedColumnExprList(); this.columnToHadoopFunction = dataDescription.getColumnToHadoopFunction(); this.precedingFilterExpr = dataDescription.getPrecdingFilterExpr(); @@ -202,9 +198,9 @@ public class BrokerFileGroup implements Writable { } // column - valueSeparator = dataDescription.getColumnSeparator(); - if (valueSeparator == null) { - valueSeparator = "\t"; + columnSeparator = dataDescription.getColumnSeparator(); + if (columnSeparator == null) { + columnSeparator = "\t"; } lineDelimiter = dataDescription.getLineDelimiter(); if (lineDelimiter == null) { @@ -213,8 +209,7 @@ public class BrokerFileGroup implements Writable { fileFormat = dataDescription.getFileFormat(); if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase("parquet") - && !fileFormat.equalsIgnoreCase(FeConstants.csv) + if (!fileFormat.equalsIgnoreCase("parquet") && !fileFormat.equalsIgnoreCase(FeConstants.csv) && !fileFormat.equalsIgnoreCase("orc") && !fileFormat.equalsIgnoreCase("json") && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) @@ -269,8 +264,8 @@ public class BrokerFileGroup implements Writable { return tableId; } - public String getValueSeparator() { - return valueSeparator; + public String getColumnSeparator() { + return columnSeparator; } public String getLineDelimiter() { @@ -301,8 +296,8 @@ public class BrokerFileGroup implements Writable { return filePaths; } - public List getColumnsFromPath() { - return columnsFromPath; + public List getColumnNamesFromPath() { + return columnNamesFromPath; } public List getColumnExprList() { @@ -428,10 +423,10 @@ public class BrokerFileGroup implements Writable { } sb.append("]"); } - if (columnsFromPath != null) { + if (columnNamesFromPath != null) { sb.append(",columnsFromPath=["); int idx = 0; - for (String name : columnsFromPath) { + for (String name : columnNamesFromPath) { if (idx++ != 0) { sb.append(","); } @@ -450,7 +445,7 @@ public class BrokerFileGroup implements Writable { } sb.append("]"); } - sb.append(",valueSeparator=").append(valueSeparator) + sb.append(",valueSeparator=").append(columnSeparator) .append(",lineDelimiter=").append(lineDelimiter) .append(",fileFormat=").append(fileFormat) .append(",isNegative=").append(isNegative); @@ -476,7 +471,7 @@ public class BrokerFileGroup implements Writable { // tableId out.writeLong(tableId); // valueSeparator - Text.writeString(out, valueSeparator); + Text.writeString(out, columnSeparator); // lineDelimiter Text.writeString(out, lineDelimiter); // isNegative @@ -523,7 +518,7 @@ public class BrokerFileGroup implements Writable { @Deprecated public void readFields(DataInput in) throws IOException { tableId = in.readLong(); - valueSeparator = Text.readString(in); + columnSeparator = Text.readString(in); lineDelimiter = Text.readString(in); isNegative = in.readBoolean(); // partitionIds diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 4d11eeb493..569fbe69de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -87,7 +87,6 @@ import org.apache.doris.system.Backend; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.task.PushTask; -import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; @@ -806,14 +805,13 @@ public class Load { * And it must be called in same db lock when planing. */ public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, - Map>> columnToHadoopFunction, - Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, List hiddenColumns, - boolean useVectorizedLoad) throws UserException { + Map>> columnToHadoopFunction, Map exprsByName, + Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad) + throws UserException { rewriteColumns(columnDescs); - initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, - srcTupleDesc, slotDescByName, params, formatType, hiddenColumns, useVectorizedLoad, true); + initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName, + srcSlotIds, formatType, hiddenColumns, useVectorizedLoad, true); } /* @@ -825,11 +823,10 @@ public class Load { * 5. init slot descs and expr map for load plan */ private static void initColumns(Table tbl, List columnExprs, - Map>> columnToHadoopFunction, - Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params, - TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad, - boolean needInitSlotAndAnalyzeExprs) throws UserException { + Map>> columnToHadoopFunction, Map exprsByName, + Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean useVectorizedLoad, + boolean needInitSlotAndAnalyzeExprs) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. // skip the mapping columns not exist in schema @@ -988,7 +985,7 @@ public class Load { slotDesc.setIsNullable(true); } slotDesc.setIsMaterialized(true); - params.addToSrcSlotIds(slotDesc.getId().asInt()); + srcSlotIds.add(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index dedd436a78..535ec50eed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.ExternalFileScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TUniqueId; @@ -143,9 +144,17 @@ public class LoadingTaskPlanner { // Generate plan trees // 1. Broker scan node - BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc, "BrokerScanNode", + ScanNode scanNode; + if (Config.enable_new_load_scan_node) { + scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "FileScanNode"); + ((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, + fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo); + } else { + scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode", fileStatusesList, filesAdded); - scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo); + ((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, + loadParallelism, userInfo); + } scanNode.init(analyzer); scanNode.finalize(analyzer); if (Config.enable_vectorized_load) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index d255c458e6..622a82d2af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -506,10 +506,8 @@ public class SparkLoadPendingTask extends LoadTask { fileGroup.isNegative(), columnMappings, where, partitionIds); } else { etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames, - fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(), - fileGroup.getLineDelimiter(), fileGroup.isNegative(), - fileGroup.getFileFormat(), columnMappings, - where, partitionIds); + fileGroup.getColumnNamesFromPath(), fileGroup.getColumnSeparator(), fileGroup.getLineDelimiter(), + fileGroup.isNegative(), fileGroup.getFileFormat(), columnMappings, where, partitionIds); } return etlFileGroup; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index b67e1e91bb..7dcf633598 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -235,11 +235,11 @@ public class BrokerScanNode extends LoadScanNode { context.params = params; BrokerFileGroup fileGroup = context.fileGroup; - params.setColumnSeparator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]); + params.setColumnSeparator(fileGroup.getColumnSeparator().getBytes(Charset.forName("UTF-8"))[0]); params.setLineDelimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]); - params.setColumnSeparatorStr(fileGroup.getValueSeparator()); + params.setColumnSeparatorStr(fileGroup.getColumnSeparator()); params.setLineDelimiterStr(fileGroup.getLineDelimiter()); - params.setColumnSeparatorLength(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8")).length); + params.setColumnSeparatorLength(fileGroup.getColumnSeparator().getBytes(Charset.forName("UTF-8")).length); params.setLineDelimiterLength(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8")).length); params.setStrictMode(strictMode); params.setProperties(brokerDesc.getProperties()); @@ -283,9 +283,8 @@ public class BrokerScanNode extends LoadScanNode { } if (targetTable != null) { - Load.initColumns(targetTable, columnDescs, - context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, - context.srcTupleDescriptor, context.slotDescByName, context.params, + Load.initColumns(targetTable, columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, + analyzer, context.srcTupleDescriptor, context.slotDescByName, context.params.getSrcSlotIds(), formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); } } @@ -514,7 +513,7 @@ public class BrokerScanNode extends LoadScanNode { String headerType = getHeaderType(context.fileGroup.getFileFormat()); TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, - context.fileGroup.getColumnsFromPath()); + context.fileGroup.getColumnNamesFromPath()); int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 84d388fa34..0fd3db091f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -142,9 +142,12 @@ public class StreamLoadScanNode extends LoadScanNode { } } - Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, - exprsByName, analyzer, srcTupleDesc, slotDescByName, params, - taskInfo.getFormatType(), taskInfo.getHiddenColumns(), VectorizedUtil.isVectorized()); + if (params.getSrcSlotIds() == null) { + params.setSrcSlotIds(new java.util.ArrayList()); + } + Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, exprsByName, analyzer, srcTupleDesc, + slotDescByName, params.getSrcSlotIds(), taskInfo.getFormatType(), taskInfo.getHiddenColumns(), + VectorizedUtil.isVectorized()); // analyze where statement initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java new file mode 100644 index 0000000000..739b5f4596 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.UserProperty; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; + +public class BackendPolicy { + private static final Logger LOG = LogManager.getLogger(BackendPolicy.class); + private final List backends = Lists.newArrayList(); + + private int nextBe = 0; + + public void init() throws UserException { + Set tags = Sets.newHashSet(); + if (ConnectContext.get().getCurrentUserIdentity() != null) { + String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); + tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + if (tags == UserProperty.INVALID_RESOURCE_TAGS) { + throw new UserException("No valid resource tag for user: " + qualifiedUser); + } + } else { + LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); + } + + // scan node is used for query + BeSelectionPolicy policy = new BeSelectionPolicy.Builder() + .needQueryAvailable() + .needLoadAvailable() + .addTags(tags) + .build(); + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (policy.isMatch(be)) { + backends.add(be); + } + } + if (backends.isEmpty()) { + throw new UserException("No available backends"); + } + Random random = new Random(System.currentTimeMillis()); + Collections.shuffle(backends, random); + } + + public Backend getNextBe() { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + return selectedBackend; + } + + public int numBackends() { + return backends.size(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index ce1993a240..e9f2f51f3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -18,60 +18,47 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ArithmeticExpr; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.mysql.privilege.UserProperty; +import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.resource.Tag; +import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; -import org.apache.doris.system.Backend; -import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; -import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileScanSlotInfo; -import org.apache.doris.thrift.TFileTextScanRangeParams; -import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.THdfsParams; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; /** * ExternalFileScanNode for the file access type of catalog, now only support @@ -80,295 +67,301 @@ import java.util.Set; public class ExternalFileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class); - private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001"; + public static class ParamCreateContext { + public BrokerFileGroup fileGroup; + public List conjuncts; - private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; + public TupleDescriptor destTupleDescriptor; - // Just for explain - private int inputSplitsNum = 0; - private long totalFileSize = 0; - - private static class ParamCreateContext { - public TFileScanRangeParams params; + // === Set when init === public TupleDescriptor srcTupleDescriptor; + public Map exprMap; + public Map slotDescByName; + public String timezone; + // === Set when init === + + public TFileScanRangeParams params; + } - private static class BackendPolicy { - private final List backends = Lists.newArrayList(); - - private int nextBe = 0; - - public void init() throws UserException { - Set tags = Sets.newHashSet(); - if (ConnectContext.get().getCurrentUserIdentity() != null) { - String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); - tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - if (tags == UserProperty.INVALID_RESOURCE_TAGS) { - throw new UserException("No valid resource tag for user: " + qualifiedUser); - } - } else { - LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); - } - - // scan node is used for query - BeSelectionPolicy policy = new BeSelectionPolicy.Builder() - .needQueryAvailable() - .needLoadAvailable() - .addTags(tags) - .build(); - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { - if (policy.isMatch(be)) { - backends.add(be); - } - } - if (backends.isEmpty()) { - throw new UserException("No available backends"); - } - Random random = new Random(System.currentTimeMillis()); - Collections.shuffle(backends, random); - } - - public Backend getNextBe() { - Backend selectedBackend = backends.get(nextBe++); - nextBe = nextBe % backends.size(); - return selectedBackend; - } - - public int numBackends() { - return backends.size(); - } - } - - private static class FileSplitStrategy { - private long totalSplitSize; - private int splitNum; - - FileSplitStrategy() { - this.totalSplitSize = 0; - this.splitNum = 0; - } - - public void update(FileSplit split) { - totalSplitSize += split.getLength(); - splitNum++; - } - - public boolean hasNext() { - return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num; - } - - public void next() { - totalSplitSize = 0; - splitNum = 0; - } + public enum Type { + LOAD, QUERY } + private Type type = Type.QUERY; private final BackendPolicy backendPolicy = new BackendPolicy(); - private final ParamCreateContext context = new ParamCreateContext(); + // Only for load job. + // Save all info about load attributes and files. + // Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list. + private List fileGroupInfos = Lists.newArrayList(); + // For query, there is only one FileScanProvider in this list. + // For load, the num of providers equals to the num of file group infos. + private List scanProviders = Lists.newArrayList(); + // For query, there is only one ParamCreateContext in this list. + // For load, the num of ParamCreateContext equals to the num of file group infos. + private List contexts = Lists.newArrayList(); - private final List partitionKeys = new ArrayList<>(); + // Final output of this file scan node + private List scanRangeLocations = Lists.newArrayList(); - private List scanRangeLocations; - - private final HMSExternalTable hmsTable; - - private ExternalFileScanProvider scanProvider; + // For explain + private long inputSplitsNum = 0; + private long totalFileSize = 0; /** - * External file scan node for hms table. + * External file scan node for: + * 1. Query hms table + * 2. Load from file */ - public ExternalFileScanNode( - PlanNodeId id, - TupleDescriptor desc, - String planNodeName) { - + public ExternalFileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE); + } - this.hmsTable = (HMSExternalTable) this.desc.getTable(); - - switch (this.hmsTable.getDlaType()) { - case HUDI: - this.scanProvider = new ExternalHudiScanProvider(this.hmsTable); - break; - case ICEBERG: - this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable); - break; - case HIVE: - this.scanProvider = new ExternalHiveScanProvider(this.hmsTable); - break; - default: - LOG.warn("Unknown table for dla."); + // Only for load job. + public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, + List fileGroups, List> fileStatusesList, int filesAdded, + boolean strictMode, int loadParallelism, UserIdentity userIdentity) { + Preconditions.checkState(fileGroups.size() == fileStatusesList.size()); + for (int i = 0; i < fileGroups.size(); ++i) { + FileGroupInfo fileGroupInfo = new FileGroupInfo(loadJobId, txnId, targetTable, brokerDesc, + fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism, userIdentity); + fileGroupInfos.add(fileGroupInfo); } + this.type = Type.LOAD; } @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - if (hmsTable.isView()) { - throw new AnalysisException(String.format("Querying external view '[%s].%s.%s' is not supported", - hmsTable.getDlaType(), hmsTable.getDbName(), hmsTable.getName())); + + switch (type) { + case QUERY: + HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); + Preconditions.checkNotNull(hmsTable); + + if (hmsTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), + hmsTable.getDbName(), hmsTable.getName())); + } + + FileScanProviderIf scanProvider; + switch (hmsTable.getDlaType()) { + case HUDI: + scanProvider = new HudiScanProvider(hmsTable, desc); + break; + case ICEBERG: + scanProvider = new IcebergScanProvider(hmsTable, desc); + break; + case HIVE: + scanProvider = new HiveScanProvider(hmsTable, desc); + break; + default: + throw new UserException("Unknown table type: " + hmsTable.getDlaType()); + } + this.scanProviders.add(scanProvider); + break; + case LOAD: + for (FileGroupInfo fileGroupInfo : fileGroupInfos) { + this.scanProviders.add(new LoadScanProvider(fileGroupInfo)); + } + break; + default: + throw new UserException("Unknown type: " + type); } + backendPolicy.init(); numNodes = backendPolicy.numBackends(); - initContext(); + + initParamCreateContexts(analyzer); } - private void initContext() throws DdlException, MetaNotFoundException { - context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); - context.params = new TFileScanRangeParams(); - if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { - Map serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); - String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) - ? HIVE_DEFAULT_COLUMN_SEPARATOR - : serDeInfoParams.get("field.delim"); - String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) - ? HIVE_DEFAULT_LINE_DELIMITER - : serDeInfoParams.get("line.delim"); + // For each scan provider, create a corresponding ParamCreateContext + private void initParamCreateContexts(Analyzer analyzer) throws UserException { + for (FileScanProviderIf scanProvider : scanProviders) { + ParamCreateContext context = scanProvider.createContext(analyzer); + // set where and preceding filter. + // FIXME(cmy): we should support set different expr for different file group. + initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); + initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer); + context.destTupleDescriptor = desc; + context.conjuncts = conjuncts; + this.contexts.add(context); + } + } - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setLineDelimiterStr(lineDelimiter); - textParams.setColumnSeparatorStr(columnSeparator); + private void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addPreFilterConjuncts(newWhereExpr.getConjuncts()); + } + } - context.params.setTextParams(textParams); + private void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) + throws UserException { + Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer); + if (newWhereExpr != null) { + addConjuncts(newWhereExpr.getConjuncts()); + } + } + + private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException { + if (whereExpr == null) { + return null; } - context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); - // Need re compute memory layout after set some slot descriptor to nullable - context.srcTupleDescriptor.computeStatAndMemLayout(); - - Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - - List columns = hmsTable.getBaseSchema(false); - for (Column column : columns) { - SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); - slotDesc.setType(column.getType()); - slotDesc.setIsMaterialized(true); - slotDesc.setIsNullable(true); - slotDesc.setColumn(new Column(column)); - slotDescByName.put(column.getName(), slotDesc); + Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) { + dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); } - // Hive table must extract partition value from path and hudi/iceberg table keep - // partition field in file. - partitionKeys.addAll(scanProvider.getPathPartitionKeys()); - context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); - for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.isMaterialized()) { - continue; + // substitute SlotRef in filter expression + // where expr must be equal first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate) + Expr newWhereExpr = analyzer.getExprRewriter() + .rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE); + List slots = Lists.newArrayList(); + newWhereExpr.collect(SlotRef.class, slots); + + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + for (SlotRef slot : slots) { + SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName()); + if (slotDesc == null) { + throw new UserException( + "unknown column reference in where statement, reference=" + slot.getColumnName()); } - int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); - - TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); - slotInfo.setSlotId(slotId); - slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); - - context.params.addToRequiredSlots(slotInfo); + smap.getLhs().add(slot); + smap.getRhs().add(new SlotRef(slotDesc)); } + newWhereExpr = newWhereExpr.clone(smap); + newWhereExpr.analyze(analyzer); + if (!newWhereExpr.getType().equals(org.apache.doris.catalog.Type.BOOLEAN)) { + throw new UserException("where statement is not a valid statement return bool"); + } + return newWhereExpr; } @Override public void finalize(Analyzer analyzer) throws UserException { - try { - buildScanRange(); - } catch (IOException e) { - LOG.warn("Finalize failed.", e); - throw new UserException("Finalize failed: " + e.getMessage()); + Preconditions.checkState(contexts.size() == scanProviders.size(), + contexts.size() + " vs. " + scanProviders.size()); + for (int i = 0; i < contexts.size(); ++i) { + ParamCreateContext context = contexts.get(i); + finalizeParamsForLoad(context, analyzer); + FileScanProviderIf scanProvider = scanProviders.get(i); + createScanRangeLocations(context, scanProvider); + this.inputSplitsNum += scanProvider.getInputSplitNum(); + this.totalFileSize += scanProvider.getInputFileSize(); } } - // If fileFormat is not null, we use fileFormat instead of check file's suffix - private void buildScanRange() throws UserException, IOException { - scanRangeLocations = Lists.newArrayList(); - List inputSplits = scanProvider.getSplits(conjuncts); - if (inputSplits.isEmpty()) { + protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyzer) throws UserException { + if (type != Type.LOAD) { return; } - inputSplitsNum = inputSplits.size(); + Map slotDescByName = context.slotDescByName; + Map exprMap = context.exprMap; + TupleDescriptor srcTupleDesc = context.srcTupleDescriptor; + boolean negative = context.fileGroup.isNegative(); - String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); - String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); - String fsName = fullPath.replace(filePath, ""); - context.params.setFileType(scanProvider.getTableFileType()); - context.params.setFormatType(scanProvider.getTableFormatType()); - // set hdfs params for hdfs file type. - if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); - tHdfsParams.setFsName(fsName); - context.params.setHdfsParams(tHdfsParams); - } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { - context.params.setProperties(hmsTable.getS3Properties()); + TFileScanRangeParams params = context.params; + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : desc.getSlots()) { + if (!destSlotDesc.isMaterialized()) { + continue; + } + Expr expr = null; + if (exprMap != null) { + expr = exprMap.get(destSlotDesc.getColumn().getName()); + } + if (expr == null) { + SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); + if (srcSlotDesc != null) { + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + // If dest is allow null, we set source to nullable + if (destSlotDesc.getColumn().isAllowNull()) { + srcSlotDesc.setIsNullable(true); + } + expr = new SlotRef(srcSlotDesc); + } else { + Column column = destSlotDesc.getColumn(); + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + } + } + + // check hll_hash + if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() + .getFunction().equalsIgnoreCase("hll_empty")) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=hll_empty()"); + } + expr.setType(org.apache.doris.catalog.Type.HLL); + } + + checkBitmapCompatibility(analyzer, destSlotDesc, expr); + checkQuantileStateCompatibility(analyzer, destSlotDesc, expr); + + if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { + expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); + expr.analyze(analyzer); + } + expr = castToSlot(destSlotDesc, expr); + params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); } + params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); + params.setDestTupleId(desc.getId().asInt()); + params.setSrcTupleId(srcTupleDesc.getId().asInt()); - TScanRangeLocations curLocations = newLocations(context.params); + // Need re compute memory layout after set some slot descriptor to nullable + srcTupleDesc.computeStatAndMemLayout(); + } - FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); - - for (InputSplit split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - totalFileSize += split.getLength(); - - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - partitionKeys, false); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.info("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " - + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" - + " loaction: " + Joiner.on("|").join(split.getLocations())); - - fileSplitStrategy.update(fileSplit); - // Add a new location when it's can be split - if (fileSplitStrategy.hasNext()) { - scanRangeLocations.add(curLocations); - curLocations = newLocations(context.params); - fileSplitStrategy.next(); + protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isBitmapType()) { + String errorMsg = String.format("bitmap column %s require the function return type is BITMAP", + slotDesc.getColumn().getName()); + throw new AnalysisException(errorMsg); } } - if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { - scanRangeLocations.add(curLocations); + } + + protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr) + throws AnalysisException { + if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) { + expr.analyze(analyzer); + if (!expr.getType().isQuantileStateType()) { + String errorMsg = "quantile_state column %s require the function return type is QUANTILE_STATE"; + throw new AnalysisException(errorMsg); + } } } - private TScanRangeLocations newLocations(TFileScanRangeParams params) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend = backendPolicy.getNextBe(); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - locations.addToLocations(location); - - return locations; - } - - private TFileRangeDesc createFileRangeDesc( - FileSplit fileSplit, - List columnsFromPath) throws DdlException, MetaNotFoundException { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setStartOffset(fileSplit.getStart()); - rangeDesc.setSize(fileSplit.getLength()); - rangeDesc.setColumnsFromPath(columnsFromPath); - - if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { - rangeDesc.setPath(fileSplit.getPath().toString()); - } - return rangeDesc; + private void createScanRangeLocations(ParamCreateContext context, FileScanProviderIf scanProvider) + throws UserException { + scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } @Override @@ -401,9 +394,8 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(prefix).append("table: ").append(hmsTable.getDbName()).append(".").append(hmsTable.getName()) - .append("\n").append(prefix).append("hms url: ").append(scanProvider.getMetaStoreUrl()).append("\n"); + StringBuilder output = new StringBuilder(prefix); + // output.append(fileTable.getExplainString(prefix)); if (!conjuncts.isEmpty()) { output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java deleted file mode 100644 index 2dd9acd000..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ /dev/null @@ -1,186 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner.external; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.HiveBucketUtil; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.external.hive.util.HiveUtil; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * A HiveScanProvider to get information for scan node. - */ -public class ExternalHiveScanProvider implements ExternalFileScanProvider { - private static final Logger LOG = LogManager.getLogger(ExternalHiveScanProvider.class); - - protected HMSExternalTable hmsTable; - - public ExternalHiveScanProvider(HMSExternalTable hmsTable) { - this.hmsTable = hmsTable; - } - - @Override - public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { - TFileFormatType type = null; - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); - String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); - if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { - type = TFileFormatType.FORMAT_PARQUET; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { - type = TFileFormatType.FORMAT_ORC; - } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { - type = TFileFormatType.FORMAT_CSV_PLAIN; - } - return type; - } - - @Override - public TFileType getTableFileType() throws DdlException, MetaNotFoundException { - String location = hmsTable.getRemoteTable().getSd().getLocation(); - if (location != null && !location.isEmpty()) { - if (location.startsWith("s3a") || location.startsWith("s3n")) { - return TFileType.FILE_S3; - } else if (location.startsWith("hdfs:")) { - return TFileType.FILE_HDFS; - } - } - throw new DdlException("Unknown file type for hms table."); - } - - @Override - public String getMetaStoreUrl() { - return hmsTable.getMetastoreUri(); - } - - @Override - public List getSplits(List exprs) - throws IOException, UserException { - String splitsPath = getRemoteHiveTable().getSd().getLocation(); - List partitionKeys = getRemoteHiveTable().getPartitionKeys() - .stream().map(FieldSchema::getName).collect(Collectors.toList()); - List hivePartitions = new ArrayList<>(); - - if (partitionKeys.size() > 0) { - ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr( - exprs, partitionKeys, hmsTable.getName()); - - String metaStoreUris = getMetaStoreUrl(); - hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions( - metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate)); - } - - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); - - Configuration configuration = setConfiguration(); - InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); - List splits; - if (!hivePartitions.isEmpty()) { - try { - splits = hivePartitions.stream().flatMap(x -> { - try { - return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } catch (RuntimeException e) { - throw new IOException(e); - } - } else { - splits = getSplitsByPath(inputFormat, configuration, splitsPath); - } - return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, - getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), - getRemoteHiveTable().getParameters()); - } - - private List getSplitsByPath(InputFormat inputFormat, Configuration configuration, - String splitsPath) throws IOException { - JobConf jobConf = new JobConf(configuration); - // For Tez engine, it may generate subdirectoies for "union" query. - // So there may be files and directories in the table directory at the same time. eg: - // /user/hive/warehouse/region_tmp_union_all2/000000_0 - // /user/hive/warehouse/region_tmp_union_all2/1 - // /user/hive/warehouse/region_tmp_union_all2/2 - // So we need to set this config to support visit dir recursively. - // Otherwise, getSplits() may throw exception: "Not a file xxx" - // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); - FileInputFormat.setInputPaths(jobConf, splitsPath); - InputSplit[] splits = inputFormat.getSplits(jobConf, 0); - return Lists.newArrayList(splits); - } - - - protected Configuration setConfiguration() { - Configuration conf = new HdfsConfiguration(); - Map dfsProperties = hmsTable.getDfsProperties(); - for (Map.Entry entry : dfsProperties.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - Map s3Properties = hmsTable.getS3Properties(); - for (Map.Entry entry : s3Properties.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - return conf; - } - - @Override - public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { - return hmsTable.getRemoteTable(); - } - - @Override - public Map getTableProperties() throws MetaNotFoundException { - Map properteis = Maps.newHashMap(hmsTable.getRemoteTable().getParameters()); - properteis.putAll(hmsTable.getDfsProperties()); - return properteis; - } - - @Override - public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java new file mode 100644 index 0000000000..529f575a47 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * FileTable encapsulates a set of files to be scanned into a Table like structure, + * which has table attributes such as Schema, and file attributes such as row and column separators. + * Mainly used to unify HMSTableScan and FileScan + */ +public class FileGroupInfo { + private static final Logger LOG = LogManager.getLogger(FileGroupInfo.class); + + private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001"; + private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n"; + + private long loadJobId; + private long txnId; + private Table targetTable; + private BrokerDesc brokerDesc; + private BrokerFileGroup fileGroup; + private List fileStatuses; + private int filesAdded; + private boolean strictMode; + private int loadParallelism; + private UserIdentity userIdentity; + // set by getFileStatusAndCalcInstance + long bytesPerInstance = 0; + + public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, + BrokerFileGroup fileGroup, List fileStatuses, int filesAdded, boolean strictMode, + int loadParallelism, UserIdentity userIdentity) { + this.loadJobId = loadJobId; + this.txnId = txnId; + this.targetTable = targetTable; + this.brokerDesc = brokerDesc; + this.fileGroup = fileGroup; + this.fileStatuses = fileStatuses; + this.filesAdded = filesAdded; + this.strictMode = strictMode; + this.loadParallelism = loadParallelism; + this.userIdentity = userIdentity; + } + + public Table getTargetTable() { + return targetTable; + } + + public BrokerDesc getBrokerDesc() { + return brokerDesc; + } + + public BrokerFileGroup getFileGroup() { + return fileGroup; + } + + public List getFileStatuses() { + return fileStatuses; + } + + public boolean isStrictMode() { + return strictMode; + } + + public int getLoadParallelism() { + return loadParallelism; + } + + public UserIdentity getUserIdentity() { + return userIdentity; + } + + public String getExplainString(String prefix) { + StringBuilder sb = new StringBuilder(); + sb.append("file scan\n"); + return sb.toString(); + } + + public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy) throws UserException { + if (filesAdded == 0) { + throw new UserException("No source file in this table(" + targetTable.getName() + ")."); + } + + long totalBytes = 0; + for (TBrokerFileStatus fileStatus : fileStatuses) { + totalBytes += fileStatus.size; + } + int numInstances = (int) (totalBytes / Config.min_bytes_per_broker_scanner); + int totalLoadParallelism = loadParallelism * backendPolicy.numBackends(); + numInstances = Math.min(totalLoadParallelism, numInstances); + numInstances = Math.min(numInstances, Config.max_broker_concurrency); + numInstances = Math.max(1, numInstances); + + bytesPerInstance = totalBytes / numInstances + 1; + if (bytesPerInstance > Config.max_bytes_per_broker_scanner) { + throw new UserException("Scan bytes per file scanner exceed limit: " + Config.max_bytes_per_broker_scanner); + } + LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); + } + + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); + long curInstanceBytes = 0; + long curFileOffset = 0; + for (int i = 0; i < fileStatuses.size(); ) { + TBrokerFileStatus fileStatus = fileStatuses.get(i); + long leftBytes = fileStatus.size - curFileOffset; + long tmpBytes = curInstanceBytes + leftBytes; + // header_type + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + if (tmpBytes > bytesPerInstance) { + // Now only support split plain text + if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON) { + long rangeBytes = bytesPerInstance - curInstanceBytes; + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset += rangeBytes; + } else { + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset = 0; + i++; + } + + // New one scan + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, brokerDesc, backendPolicy); + curInstanceBytes = 0; + + } else { + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset = 0; + curInstanceBytes += leftBytes; + i++; + } + } + + // Put the last file + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().isSetRanges()) { + scanRangeLocations.add(curLocations); + } + } + + protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDesc brokerDesc, + BackendPolicy backendPolicy) throws UserException { + + Backend selectedBackend = backendPolicy.getNextBe(); + + // Generate one file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + + if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { + FsBroker broker = null; + try { + broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerDesc.getName(), selectedBackend.getHost()); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } else { + params.setBrokerAddresses(new ArrayList<>()); + } + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private String getHeaderType(String formatType) { + if (formatType != null) { + if (formatType.toLowerCase().equals(FeConstants.csv_with_names) || formatType.toLowerCase() + .equals(FeConstants.csv_with_names_and_types)) { + return formatType; + } + } + return ""; + } + + private TFileFormatType formatType(String fileFormat, String path) throws UserException { + if (fileFormat != null) { + if (fileFormat.toLowerCase().equals("parquet")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (fileFormat.toLowerCase().equals("orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (fileFormat.toLowerCase().equals("json")) { + return TFileFormatType.FORMAT_JSON; + // csv/csv_with_name/csv_with_names_and_types treat as csv format + } else if (fileFormat.toLowerCase().equals(FeConstants.csv) || fileFormat.toLowerCase() + .equals(FeConstants.csv_with_names) || fileFormat.toLowerCase() + .equals(FeConstants.csv_with_names_and_types) + // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. + || fileFormat.toLowerCase().equals(FeConstants.text)) { + return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); + } + } + + String lowerCasePath = path.toLowerCase(); + if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerCasePath.endsWith(".gz")) { + return TFileFormatType.FORMAT_CSV_GZ; + } else if (lowerCasePath.endsWith(".bz2")) { + return TFileFormatType.FORMAT_CSV_BZ2; + } else if (lowerCasePath.endsWith(".lz4")) { + return TFileFormatType.FORMAT_CSV_LZ4FRAME; + } else if (lowerCasePath.endsWith(".lzo")) { + return TFileFormatType.FORMAT_CSV_LZOP; + } else if (lowerCasePath.endsWith(".deflate")) { + return TFileFormatType.FORMAT_CSV_DEFLATE; + } else { + return TFileFormatType.FORMAT_CSV_PLAIN; + } + } + + private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, + List columnsFromPath) { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setPath(fileStatus.path); + rangeDesc.setStartOffset(curFileOffset); + rangeDesc.setSize(rangeBytes); + rangeDesc.setColumnsFromPath(columnsFromPath); + return rangeDesc; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java new file mode 100644 index 0000000000..700d8be098 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TScanRangeLocations; + +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public interface FileScanProviderIf { + // Return parquet/orc/text, etc. + TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException; + + // Return S3/HDSF, etc. + TFileType getLocationType() throws DdlException, MetaNotFoundException; + + // Return file list + List getSplits(List exprs) throws IOException, UserException; + + // return properties for S3/HDFS, etc. + Map getLocationProperties() throws MetaNotFoundException, DdlException; + + List getPathPartitionKeys() throws DdlException, MetaNotFoundException; + + ParamCreateContext createContext(Analyzer analyzer) throws UserException; + + void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException; + + int getInputSplitNum(); + + long getInputFileSize(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java new file mode 100644 index 0000000000..83a6d3d49c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.common.Config; + +import org.apache.hadoop.mapred.FileSplit; + +public class FileSplitStrategy { + private long totalSplitSize; + private int splitNum; + + FileSplitStrategy() { + this.totalSplitSize = 0; + this.splitNum = 0; + } + + public void update(FileSplit split) { + totalSplitSize += split.getLength(); + splitNum++; + } + + public boolean hasNext() { + return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num; + } + + public void next() { + totalSplitSize = 0; + splitNum = 0; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java index 36e11d8845..b602722a31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProviderIf.java @@ -17,35 +17,20 @@ package org.apache.doris.planner.external; -import org.apache.doris.analysis.Expr; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.mapred.InputSplit; -import java.io.IOException; -import java.util.List; import java.util.Map; /** - * An interface for file scan node to get the need information. + * An interface for hms table scan node to get the need information. */ -public interface ExternalFileScanProvider { - TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException; - - TFileType getTableFileType() throws DdlException, MetaNotFoundException; - +public interface HMSTableScanProviderIf extends FileScanProviderIf { String getMetaStoreUrl(); - List getSplits(List exprs) throws IOException, UserException; - Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; Map getTableProperties() throws MetaNotFoundException; - - List getPathPartitionKeys() throws DdlException, MetaNotFoundException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java new file mode 100644 index 0000000000..ac72f7a274 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveBucketUtil; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A HiveScanProvider to get information for scan node. + */ +public class HiveScanProvider implements HMSTableScanProviderIf { + private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class); + + protected HMSExternalTable hmsTable; + + protected int inputSplitNum = 0; + protected long inputFileSize = 0; + protected final TupleDescriptor desc; + + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + this.hmsTable = hmsTable; + this.desc = desc; + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + TFileFormatType type = null; + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName); + if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) { + type = TFileFormatType.FORMAT_ORC; + } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) { + type = TFileFormatType.FORMAT_CSV_PLAIN; + } + return type; + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + String location = hmsTable.getRemoteTable().getSd().getLocation(); + if (location != null && !location.isEmpty()) { + if (location.startsWith("s3a") || location.startsWith("s3n")) { + return TFileType.FILE_S3; + } else if (location.startsWith("hdfs:")) { + return TFileType.FILE_HDFS; + } + } + throw new DdlException("Unknown file location for hms table."); + } + + @Override + public String getMetaStoreUrl() { + return hmsTable.getMetastoreUri(); + } + + @Override + public List getSplits(List exprs) throws IOException, UserException { + String splitsPath = getRemoteHiveTable().getSd().getLocation(); + List partitionKeys = getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toList()); + List hivePartitions = new ArrayList<>(); + + if (partitionKeys.size() > 0) { + ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr(exprs, + partitionKeys, hmsTable.getName()); + + String metaStoreUris = getMetaStoreUrl(); + hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, getRemoteHiveTable(), + hivePartitionPredicate)); + } + + String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + + Configuration configuration = setConfiguration(); + InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + List splits; + if (!hivePartitions.isEmpty()) { + try { + splits = hivePartitions.stream().flatMap(x -> { + try { + return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } catch (RuntimeException e) { + throw new IOException(e); + } + } else { + splits = getSplitsByPath(inputFormat, configuration, splitsPath); + } + return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, + getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), + getRemoteHiveTable().getParameters()); + } + + private List getSplitsByPath(InputFormat inputFormat, Configuration configuration, + String splitsPath) throws IOException { + JobConf jobConf = new JobConf(configuration); + // For Tez engine, it may generate subdirectoies for "union" query. + // So there may be files and directories in the table directory at the same time. eg: + // /user/hive/warehouse/region_tmp_union_all2/000000_0 + // /user/hive/warehouse/region_tmp_union_all2/1 + // /user/hive/warehouse/region_tmp_union_all2/2 + // So we need to set this config to support visit dir recursively. + // Otherwise, getSplits() may throw exception: "Not a file xxx" + // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 + jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); + FileInputFormat.setInputPaths(jobConf, splitsPath); + InputSplit[] splits = inputFormat.getSplits(jobConf, 0); + return Lists.newArrayList(splits); + } + + + protected Configuration setConfiguration() { + Configuration conf = new HdfsConfiguration(); + Map dfsProperties = hmsTable.getDfsProperties(); + for (Map.Entry entry : dfsProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + Map s3Properties = hmsTable.getS3Properties(); + for (Map.Entry entry : s3Properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + @Override + public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { + return hmsTable.getRemoteTable(); + } + + @Override + public Map getTableProperties() throws MetaNotFoundException { + // TODO: implement it when we really properties from remote table. + return Maps.newHashMap(); + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + TFileType locationType = getLocationType(); + if (locationType == TFileType.FILE_S3) { + return hmsTable.getS3Properties(); + } else if (locationType == TFileType.FILE_HDFS) { + return hmsTable.getDfsProperties(); + } else { + return Maps.newHashMap(); + } + } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + } + + @Override + public ParamCreateContext createContext(Analyzer analyzer) throws UserException { + ParamCreateContext context = new ParamCreateContext(); + context.params = new TFileScanRangeParams(); + context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); + context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt()); + context.fileGroup = new BrokerFileGroup(hmsTable.getId(), + hmsTable.getRemoteTable().getSd().getLocation(), hmsTable.getRemoteTable().getSd().getInputFormat()); + Map slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + List columns = hmsTable.getBaseSchema(false); + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(column.getType()); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(column)); + slotDescByName.put(column.getName(), slotDesc); + } + // Need re compute memory layout after set some slot descriptor to nullable + context.srcTupleDescriptor.computeStatAndMemLayout(); + + // Hive table must extract partition value from path and hudi/iceberg table keep + // partition field in file. + List partitionKeys = getPathPartitionKeys(); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt(); + + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slotId); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + context.params.addToRequiredSlots(slotInfo); + } + return context; + } + + @Override + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + try { + List inputSplits = getSplits(context.conjuncts); + this.inputSplitNum = inputSplits.size(); + if (inputSplits.isEmpty()) { + return; + } + + String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + TFileType locationType = getLocationType(); + context.params.setFileType(locationType); + context.params.setFormatType(getFileFormatType()); + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (locationType == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + context.params.setHdfsParams(tHdfsParams); + } else if (locationType == TFileType.FILE_S3) { + context.params.setProperties(locationProperties); + } + + TScanRangeLocations curLocations = newLocations(context.params, backendPolicy); + + FileSplitStrategy fileSplitStrategy = new FileSplitStrategy(); + + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + getPathPartitionKeys(), false); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + LOG.info( + "Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" + + " loaction: " + Joiner.on("|").join(split.getLocations())); + + fileSplitStrategy.update(fileSplit); + // Add a new location when it's can be split + if (fileSplitStrategy.hasNext()) { + scanRangeLocations.add(curLocations); + curLocations = newLocations(context.params, backendPolicy); + fileSplitStrategy.next(); + } + this.inputFileSize += fileSplit.getLength(); + } + if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { + scanRangeLocations.add(curLocations); + } + } catch (IOException e) { + throw new UserException(e); + } + } + + @Override + public int getInputSplitNum() { + return this.inputSplitNum; + } + + @Override + public long getInputFileSize() { + return this.inputFileSize; + } + + private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + fileScanRange.setParams(params); + + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend selectedBackend = backendPolicy.getNextBe(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + + return locations; + } + + private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath) + throws DdlException, MetaNotFoundException { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setColumnsFromPath(columnsFromPath); + + if (getLocationType() == TFileType.FILE_HDFS) { + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + } else if (getLocationType() == TFileType.FILE_S3) { + rangeDesc.setPath(fileSplit.getPath().toString()); + } + return rangeDesc; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java similarity index 83% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java index c32a2ac3d7..d4c95b2549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java @@ -17,6 +17,7 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -29,14 +30,14 @@ import java.util.List; * A file scan provider for hudi. * HudiProvier is extended with hive since they both use input format interface to get the split. */ -public class ExternalHudiScanProvider extends ExternalHiveScanProvider { +public class HudiScanProvider extends HiveScanProvider { - public ExternalHudiScanProvider(HMSExternalTable hmsTable) { - super(hmsTable); + public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + super(hmsTable, desc); } @Override - public TFileFormatType getTableFormatType() throws DdlException { + public TFileFormatType getFileFormatType() throws DdlException { return TFileFormatType.FORMAT_PARQUET; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index 10fd85b242..f82e277f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -45,14 +46,14 @@ import java.util.Map; /** * A file scan provider for iceberg. */ -public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { +public class IcebergScanProvider extends HiveScanProvider { - public ExternalIcebergScanProvider(HMSExternalTable hmsTable) { - super(hmsTable); + public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + super(hmsTable, desc); } @Override - public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { TFileFormatType type; String icebergFormat = getRemoteHiveTable().getParameters() diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java new file mode 100644 index 0000000000..8843ff4462 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.VectorizedUtil; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.task.LoadTaskInfo; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class LoadScanProvider implements FileScanProviderIf { + + FileGroupInfo fileGroupInfo; + + public LoadScanProvider(FileGroupInfo fileGroupInfo) { + this.fileGroupInfo = fileGroupInfo; + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public List getSplits(List exprs) throws IOException, UserException { + return null; + } + + @Override + public Map getLocationProperties() throws MetaNotFoundException, DdlException { + return null; + } + + @Override + public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return null; + } + + @Override + public ParamCreateContext createContext(Analyzer analyzer) throws UserException { + ParamCreateContext ctx = new ParamCreateContext(); + ctx.fileGroup = fileGroupInfo.getFileGroup(); + ctx.timezone = analyzer.getTimezone(); + + TFileScanRangeParams params = new TFileScanRangeParams(); + params.setStrictMode(fileGroupInfo.isStrictMode()); + params.setProperties(fileGroupInfo.getBrokerDesc().getProperties()); + if (fileGroupInfo.getBrokerDesc().getFileType() == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(fileGroupInfo.getBrokerDesc().getProperties()); + params.setHdfsParams(tHdfsParams); + } + TFileAttributes fileAttributes = new TFileAttributes(); + setFileAttributes(ctx.fileGroup, fileAttributes); + params.setFileAttributes(fileAttributes); + ctx.params = params; + + initColumns(ctx, analyzer); + return ctx; + } + + public void setFileAttributes(BrokerFileGroup fileGroup, TFileAttributes fileAttributes) { + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + textParams.setColumnSeparator(fileGroup.getColumnSeparator()); + textParams.setLineDelimiter(fileGroup.getLineDelimiter()); + fileAttributes.setTextParams(textParams); + fileAttributes.setStripOuterArray(fileGroup.isStripOuterArray()); + fileAttributes.setJsonpaths(fileGroup.getJsonPaths()); + fileAttributes.setJsonRoot(fileGroup.getJsonRoot()); + fileAttributes.setNumAsString(fileGroup.isNumAsString()); + fileAttributes.setFuzzyParse(fileGroup.isFuzzyParse()); + fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine()); + fileAttributes.setReadByColumnDef(true); + fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat())); + } + + private String getHeaderType(String formatType) { + if (formatType != null) { + if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || formatType.equalsIgnoreCase( + FeConstants.csv_with_names_and_types)) { + return formatType; + } + } + return ""; + } + + @Override + public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + Preconditions.checkNotNull(fileGroupInfo); + fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy); + fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations); + } + + @Override + public int getInputSplitNum() { + return fileGroupInfo.getFileStatuses().size(); + } + + @Override + public long getInputFileSize() { + long res = 0; + for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) { + res = fileStatus.getSize(); + } + return res; + } + + /** + * This method is used to calculate the slotDescByName and exprMap. + * The expr in exprMap is analyzed in this function. + * The smap of slot which belongs to expr will be analyzed by src desc. + * slotDescByName: the single slot from columns in load stmt + * exprMap: the expr from column mapping in load stmt. + * + * @param context + * @throws UserException + */ + private void initColumns(ParamCreateContext context, Analyzer analyzer) throws UserException { + context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); + context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + // for load job, column exprs is got from file group + // for query, there is no column exprs, they will be got from table's schema in "Load.initColumns" + LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs(); + // fileGroup.getColumnExprList() contains columns from path + columnDescs.descs = context.fileGroup.getColumnExprList(); + if (context.fileGroup.getMergeType() == LoadTask.MergeType.MERGE) { + columnDescs.descs.add( + ImportColumnDesc.newDeleteSignImportColumnDesc(context.fileGroup.getDeleteCondition())); + } else if (context.fileGroup.getMergeType() == LoadTask.MergeType.DELETE) { + columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); + } + // add columnExpr for sequence column + if (context.fileGroup.hasSequenceCol()) { + columnDescs.descs.add( + new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, context.fileGroup.getSequenceCol()))); + } + List srcSlotIds = Lists.newArrayList(); + Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), + context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, srcSlotIds, + formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); + + int numColumnsFromFile = srcSlotIds.size() - context.fileGroup.getColumnNamesFromPath().size(); + Preconditions.checkState(numColumnsFromFile >= 0, + "srcSlotIds.size is: " + srcSlotIds.size() + ", num columns from path: " + + context.fileGroup.getColumnNamesFromPath().size()); + context.params.setNumOfColumnsFromFile(numColumnsFromFile); + for (int i = 0; i < srcSlotIds.size(); ++i) { + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(srcSlotIds.get(i)); + slotInfo.setIsFileSlot(i < numColumnsFromFile); + context.params.addToRequiredSlots(slotInfo); + } + } + + private TFileFormatType formatType(String fileFormat, String path) throws UserException { + if (fileFormat != null) { + String lowerFileFormat = fileFormat.toLowerCase(); + if (lowerFileFormat.equals("parquet")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerFileFormat.equals("orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (lowerFileFormat.equals("json")) { + return TFileFormatType.FORMAT_JSON; + // csv/csv_with_name/csv_with_names_and_types treat as csv format + } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) + || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) + // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. + || lowerFileFormat.equals(FeConstants.text)) { + return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); + } + } + + String lowerCasePath = path.toLowerCase(); + if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerCasePath.endsWith(".gz")) { + return TFileFormatType.FORMAT_CSV_GZ; + } else if (lowerCasePath.endsWith(".bz2")) { + return TFileFormatType.FORMAT_CSV_BZ2; + } else if (lowerCasePath.endsWith(".lz4")) { + return TFileFormatType.FORMAT_CSV_LZ4FRAME; + } else if (lowerCasePath.endsWith(".lzo")) { + return TFileFormatType.FORMAT_CSV_LZOP; + } else if (lowerCasePath.endsWith(".deflate")) { + return TFileFormatType.FORMAT_CSV_DEFLATE; + } else { + return TFileFormatType.FORMAT_CSV_PLAIN; + } + } +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 13c8452293..584054f611 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -217,8 +217,8 @@ struct TEsScanRange { } struct TFileTextScanRangeParams { - 1: optional string column_separator_str; - 2: optional string line_delimiter_str; + 1: optional string column_separator; + 2: optional string line_delimiter; } struct TFileScanSlotInfo { @@ -226,20 +226,51 @@ struct TFileScanSlotInfo { 2: optional bool is_file_slot; } -struct TFileScanRangeParams { - 1: optional Types.TFileType file_type; - 2: optional TFileFormatType format_type; - // use src_tuple_id to get all slots from src table include both file slot and partition slot. - 3: optional Types.TTupleId src_tuple_id; - // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot - 4: optional i32 num_of_columns_from_file; - // all selected slots which may compose from file and partiton value. - 5: optional list required_slots; +// descirbe how to read file +struct TFileAttributes { + 1: optional TFileTextScanRangeParams text_params; + // it's usefull when format_type == FORMAT_JSON + 2: optional bool strip_outer_array; + 3: optional string jsonpaths; + 4: optional string json_root; + 5: optional bool num_as_string; + 6: optional bool fuzzy_parse; + 7: optional bool read_json_by_line; + // Whether read line by column defination, only for Hive + 8: optional bool read_by_column_def; + // csv with header type + 9: optional string header_type; +} - 6: optional THdfsParams hdfs_params; - 7: optional TFileTextScanRangeParams text_params; - // properties for file such as s3 information - 8: optional map properties; +struct TFileScanRangeParams { + 1: optional Types.TFileType file_type; + 2: optional TFileFormatType format_type; + // If this is for load job, src point to the source table and dest point to the doris table. + // If this is for query, only dest_tuple_id is set, including both file slot and partition slot. + 3: optional Types.TTupleId src_tuple_id; + 4: optional Types.TTupleId dest_tuple_id + // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot + 5: optional i32 num_of_columns_from_file; + // all selected slots which may compose from file and partition value. + 6: optional list required_slots; + + 7: optional THdfsParams hdfs_params; + // properties for file such as s3 information + 8: optional map properties; + + // The convert exprt map for load job + // desc slot id -> expr + 9: optional map expr_of_dest_slot + // This is the mapping of dest slot id and src slot id in load expr + // It excludes the slot id which has the transform expr + 10: optional map dest_sid_to_src_sid_without_trans + + // strictMode is a boolean + // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded + 11: optional bool strict_mode + + 12: list broker_addresses + 13: TFileAttributes file_attributes } struct TFileRangeDesc { @@ -253,7 +284,9 @@ struct TFileRangeDesc { 4: optional list columns_from_path; } -// HDFS file scan range +// TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. +// TFileScanRangeParams: describe how to read and convert file +// list: file location and range struct TFileScanRange { 1: optional list ranges 2: optional TFileScanRangeParams params