diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp new file mode 100644 index 0000000000..aa0eb10078 --- /dev/null +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/file_scan_operator.h" + +#include + +#include + +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/scan_operator.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/scan/vfile_scanner.h" + +namespace doris::pipeline { + +Status FileScanLocalState::_init_scanners(std::list* scanners) { + if (Base::_eos_dependency->read_blocked_by() == nullptr) { + return Status::OK(); + } + + auto& p = _parent->cast(); + size_t shard_num = + std::min(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size()); + _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); + for (auto& scan_range : _scan_ranges) { + std::unique_ptr scanner = vectorized::VFileScanner::create_unique( + state(), this, p._limit_per_scanner, + scan_range.scan_range.ext_scan_range.file_scan_range, _scanner_profile.get(), + _kv_cache.get()); + RETURN_IF_ERROR( + scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); + scanners->push_back(std::move(scanner)); + } + return Status::OK(); +} +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h new file mode 100644 index 0000000000..e0c3fca78f --- /dev/null +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "common/logging.h" +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/scan_operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris { +class ExecNode; +namespace vectorized { +class VFileScanner; +} // namespace vectorized +} // namespace doris + +namespace doris::pipeline { + +class FileScanOperatorX; +class FileScanLocalState final : public ScanLocalState { +public: + using Parent = FileScanOperatorX; + using Base = ScanLocalState; + ENABLE_FACTORY_CREATOR(FileScanLocalState); + FileScanLocalState(RuntimeState* state, OperatorXBase* parent) + : ScanLocalState(state, parent) {} + + Status _init_scanners(std::list* scanners) override; + void set_scan_ranges(const std::vector& scan_ranges) override { + _scan_ranges = scan_ranges; + } + int parent_id() { return _parent->id(); } + +private: + std::vector _scan_ranges; + // A in memory cache to save some common components + // of the this scan node. eg: + // 1. iceberg delete file + // 2. parquet file meta + // KVCache _kv_cache; + std::unique_ptr _kv_cache; +}; + +class FileScanOperatorX final : public ScanOperatorX { +public: + FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanOperatorX(pool, tnode, descs) { + _output_tuple_id = tnode.file_scan_node.tuple_id; + _id = tnode.node_id; + } + +private: + friend class FileScanLocalState; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index ddc966edb6..bd304d87b5 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -19,9 +19,11 @@ #include +#include #include #include "pipeline/exec/es_scan_operator.h" +#include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/olap_scan_operator.h" @@ -1204,6 +1206,11 @@ TPushAggOp::type ScanLocalState::get_push_down_agg_type() { return _parent->cast()._push_down_agg_type; } +template +int64_t ScanLocalState::get_push_down_count() { + return _parent->cast()._push_down_count; +} + template int64_t ScanLocalState::limit_per_scanner() { return _parent->cast()._limit_per_scanner; @@ -1271,6 +1278,9 @@ ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& _should_run_serial = true; } } + if (tnode.__isset.push_down_count) { + _push_down_count = tnode.push_down_count; + } } template @@ -1436,6 +1446,8 @@ template class ScanOperatorX; template class ScanLocalState; template class ScanOperatorX; template class ScanLocalState; +template class ScanOperatorX; +template class ScanLocalState; template class ScanOperatorX; template class ScanLocalState; template class ScanLocalState; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index ba9cee464d..284f8afb36 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -19,6 +19,7 @@ #include +#include #include #include "common/status.h" @@ -139,6 +140,8 @@ public: virtual TPushAggOp::type get_push_down_agg_type() = 0; + virtual int64_t get_push_down_count() = 0; + [[nodiscard]] std::string get_name() { return _parent->get_name(); } protected: @@ -220,6 +223,8 @@ class ScanLocalState : public ScanLocalStateBase { TPushAggOp::type get_push_down_agg_type() override; + int64_t get_push_down_count() override; + protected: template friend class ScanOperatorX; @@ -427,6 +432,7 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } + int64_t get_push_down_count() const { return _push_down_count; } using OperatorX::id; protected: @@ -464,6 +470,9 @@ protected: std::vector _runtime_filter_descs; TPushAggOp::type _push_down_agg_type; + + // Record the value of the aggregate function 'count' from doris's be + int64_t _push_down_count = -1; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 672e585db7..47d7731344 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -31,6 +31,7 @@ #include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" +#include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" @@ -552,6 +553,7 @@ DECLARE_OPERATOR_X(PartitionSortSinkLocalState) DECLARE_OPERATOR_X(HashJoinProbeLocalState) DECLARE_OPERATOR_X(OlapScanLocalState) DECLARE_OPERATOR_X(JDBCScanLocalState) +DECLARE_OPERATOR_X(FileScanLocalState) DECLARE_OPERATOR_X(EsScanLocalState) DECLARE_OPERATOR_X(AnalyticLocalState) DECLARE_OPERATOR_X(SortLocalState) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7a0cc80556..7c67b1a07b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -54,6 +54,7 @@ #include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" +#include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" @@ -563,6 +564,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } + case doris::TPlanNodeType::FILE_SCAN_NODE: { + op.reset(new FileScanOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + break; + } case TPlanNodeType::ES_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: { op.reset(new EsScanOperatorX(pool, tnode, descs)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index a7bacdd96d..c91e1ecbd0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -78,6 +78,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams std::vector no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->id(), no_scan_ranges); + for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& deps = get_upstream_dependency(_operators[op_idx]->id()); std::vector infos; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index dd96ef88a4..cfd91ba7bc 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -116,6 +116,35 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t _is_load = (_input_tuple_desc != nullptr); } +VFileScanner::VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* local_state, + int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile, + ShardedKVCache* kv_cache) + : VScanner(state, local_state, limit, profile), + _ranges(scan_range.ranges), + _next_range(0), + _cur_reader(nullptr), + _cur_reader_eof(false), + _kv_cache(kv_cache), + _strict_mode(false) { + if (scan_range.params.__isset.strict_mode) { + _strict_mode = scan_range.params.strict_mode; + } + + if (state->get_query_ctx() != nullptr && + state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) { + _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]); + } else { + CHECK(scan_range.__isset.params); + _params = &(scan_range.params); + } + + // For load scanner, there are input and output tuple. + // For query scanner, there is only output tuple + _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id); + _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc; + _is_load = (_input_tuple_desc != nullptr); +} + Status VFileScanner::prepare( const VExprContextSPtrs& conjuncts, std::unordered_map* colname_to_value_range, @@ -123,20 +152,37 @@ Status VFileScanner::prepare( RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts)); _colname_to_value_range = colname_to_value_range; _col_name_to_slot_id = colname_to_slot_id; - - _get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime"); - _open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime"); - _cast_to_input_block_timer = - ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime"); - _fill_path_columns_timer = - ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime"); - _fill_missing_columns_timer = - ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime"); - _pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer"); - _convert_to_output_block_timer = - ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime"); - _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT); - _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); + if (get_parent() != nullptr) { + _get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime"); + _open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime"); + _cast_to_input_block_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime"); + _fill_path_columns_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime"); + _fill_missing_columns_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime"); + _pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer"); + _convert_to_output_block_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime"); + _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT); + _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); + } else { + _get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime"); + _open_reader_timer = + ADD_TIMER(_local_state->scanner_profile(), "FileScannerOpenReaderTime"); + _cast_to_input_block_timer = + ADD_TIMER(_local_state->scanner_profile(), "FileScannerCastInputBlockTime"); + _fill_path_columns_timer = + ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillPathColumnTime"); + _fill_missing_columns_timer = + ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillMissingColumnTime"); + _pre_filter_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerPreFilterTimer"); + _convert_to_output_block_timer = + ADD_TIMER(_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime"); + _empty_file_counter = + ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT); + _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); + } _file_cache_statistics.reset(new io::FileCacheStatistics()); _io_ctx.reset(new io::IOContext()); @@ -268,7 +314,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo if (read_rows > 0) { // If the push_down_agg_type is COUNT, no need to do the rest, // because we only save a number in block. - if (_parent->get_push_down_agg_type() != TPushAggOp::type::COUNT) { + if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) { // Convert the src block columns type to string in-place. RETURN_IF_ERROR(_cast_to_input_block(block)); // FileReader can fill partition and missing columns itself @@ -705,9 +751,9 @@ Status VFileScanner::_get_next_reader() { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { std::unique_ptr iceberg_reader = - IcebergTableReader::create_unique( - std::move(parquet_reader), _profile, _state, *_params, range, - _kv_cache, _io_ctx.get(), _parent->get_push_down_count()); + IcebergTableReader::create_unique(std::move(parquet_reader), _profile, + _state, *_params, range, _kv_cache, + _io_ctx.get(), _get_push_down_count()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), @@ -815,7 +861,7 @@ Status VFileScanner::_get_next_reader() { _name_to_col_type.clear(); _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); - _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type()); + _cur_reader->set_push_down_agg_type(_get_push_down_agg_type()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 1043b9eb00..fc9760b961 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -32,6 +32,7 @@ #include "exec/olap_common.h" #include "exec/text_converter.h" #include "io/io_common.h" +#include "pipeline/exec/file_scan_operator.h" #include "runtime/descriptors.h" #include "util/runtime_profile.h" #include "vec/common/schema_util.h" @@ -67,6 +68,10 @@ public: const TFileScanRange& scan_range, RuntimeProfile* profile, ShardedKVCache* kv_cache); + VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit, + const TFileScanRange& scan_range, RuntimeProfile* profile, + ShardedKVCache* kv_cache); + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; @@ -208,5 +213,21 @@ private: _counter.num_rows_unselected = 0; _counter.num_rows_filtered = 0; } + + TPushAggOp::type _get_push_down_agg_type() { + if (get_parent() != nullptr) { + return _parent->get_push_down_agg_type(); + } else { + return _local_state->get_push_down_agg_type(); + } + } + + int64_t _get_push_down_count() { + if (get_parent() != nullptr) { + return _parent->get_push_down_count(); + } else { + return _local_state->get_push_down_count(); + } + } }; } // namespace doris::vectorized