diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index efea42cb70..d684dead0a 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -137,7 +137,7 @@ Status NewEsScanNode::_process_conjuncts() { return Status::OK(); } -Status NewEsScanNode::_init_scanners(std::list* scanners) { +Status NewEsScanNode::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { _eos = true; return Status::OK(); @@ -163,13 +163,12 @@ Status NewEsScanNode::_init_scanners(std::list* scanners) { properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build( properties, _column_names, _docvalue_context, &doc_value_mode); - NewEsScanner* scanner = - new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, properties, - _docvalue_context, doc_value_mode, _state->runtime_profile()); + std::shared_ptr scanner = NewEsScanner::create_shared( + _state, this, _limit_per_scanner, _tuple_id, properties, _docvalue_context, + doc_value_mode, _state->runtime_profile()); - _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); - scanners->push_back(static_cast(scanner)); + scanners->push_back(scanner); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h index a8e48b006c..13b337290f 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.h +++ b/be/src/vec/exec/scan/new_es_scan_node.h @@ -60,7 +60,7 @@ public: protected: Status _init_profile() override; Status _process_conjuncts() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; private: TupleId _tuple_id; diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index db1c479712..e855f514d4 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -24,6 +24,7 @@ #include #include +#include "common/factory_creator.h" #include "common/global_types.h" #include "common/status.h" #include "exec/es/es_scan_reader.h" @@ -47,6 +48,8 @@ namespace doris::vectorized { class NewEsScanNode; class NewEsScanner : public VScanner { + ENABLE_FACTORY_CREATOR(NewEsScanner); + public: NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id, const std::map& properties, diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index aea5571822..3bbc8fc978 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -96,7 +96,7 @@ Status NewFileScanNode::_process_conjuncts() { return Status::OK(); } -Status NewFileScanNode::_init_scanners(std::list* scanners) { +Status NewFileScanNode::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { _eos = true; return Status::OK(); @@ -107,14 +107,13 @@ Status NewFileScanNode::_init_scanners(std::list* scanners) { std::min(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size()); _kv_cache.reset(new ShardedKVCache(shard_num)); for (auto& scan_range : _scan_ranges) { - VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, - scan_range.scan_range.ext_scan_range.file_scan_range, - runtime_profile(), _kv_cache.get()); - _scanner_pool.add(scanner); - RETURN_IF_ERROR(((VFileScanner*)scanner) - ->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range, - &_colname_to_slot_id)); - scanners->push_back(scanner); + std::unique_ptr scanner = + VFileScanner::create_unique(_state, this, _limit_per_scanner, + scan_range.scan_range.ext_scan_range.file_scan_range, + runtime_profile(), _kv_cache.get()); + RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range, + &_colname_to_slot_id)); + scanners->push_back(std::move(scanner)); } return Status::OK(); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 5cd7458cae..7654d25bb2 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -52,7 +52,7 @@ public: protected: Status _init_profile() override; Status _process_conjuncts() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; private: std::vector _scan_ranges; diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index a4dff27f34..f9f794ccb6 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -61,16 +61,15 @@ Status NewJdbcScanNode::_init_profile() { return Status::OK(); } -Status NewJdbcScanNode::_init_scanners(std::list* scanners) { +Status NewJdbcScanNode::_init_scanners(std::list* scanners) { if (_eos == true) { return Status::OK(); } - NewJdbcScanner* scanner = - new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string, - _table_type, _state->runtime_profile()); - _scanner_pool.add(scanner); + std::unique_ptr scanner = + NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, _tuple_id, + _query_string, _table_type, _state->runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); - scanners->push_back(static_cast(scanner)); + scanners->push_back(std::move(scanner)); return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h index 7e109d1dd8..54a8dc7d3d 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.h +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h @@ -44,7 +44,7 @@ public: protected: Status _init_profile() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; private: std::string _table_name; diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 1cf06387ff..ef436d5aec 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -23,6 +23,7 @@ #include #include +#include "common/factory_creator.h" #include "common/global_types.h" #include "common/status.h" #include "util/runtime_profile.h" @@ -39,6 +40,8 @@ class NewJdbcScanNode; class VExprContext; class NewJdbcScanner : public VScanner { + ENABLE_FACTORY_CREATOR(NewJdbcScanner); + public: friend class JdbcConnector; diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 170122751e..17d7f7784f 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -61,15 +61,14 @@ Status NewOdbcScanNode::_init_profile() { return Status::OK(); } -Status NewOdbcScanNode::_init_scanners(std::list* scanners) { +Status NewOdbcScanNode::_init_scanners(std::list* scanners) { if (_eos == true) { return Status::OK(); } - NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node, - _state->runtime_profile()); - _scanner_pool.add(scanner); + std::shared_ptr scanner = NewOdbcScanner::create_shared( + _state, this, _limit_per_scanner, _odbc_scan_node, _state->runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); - scanners->push_back(static_cast(scanner)); + scanners->push_back(scanner); return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h b/be/src/vec/exec/scan/new_odbc_scan_node.h index a6107ac402..f76d2b645b 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.h +++ b/be/src/vec/exec/scan/new_odbc_scan_node.h @@ -50,7 +50,7 @@ public: protected: Status _init_profile() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; private: std::string _table_name; diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h index 7097215b0b..62043f6ff7 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.h +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -22,6 +22,7 @@ #include #include +#include "common/factory_creator.h" #include "common/global_types.h" #include "common/status.h" #include "exec/odbc_connector.h" @@ -43,6 +44,8 @@ class VExprContext; namespace doris::vectorized { class NewOdbcScanner : public VScanner { + ENABLE_FACTORY_CREATOR(NewOdbcScanner); + public: NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index f9965260b3..6e94e152b2 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -423,7 +423,7 @@ std::string NewOlapScanNode::get_name() { return fmt::format("VNewOlapScanNode({0})", _olap_scan_node.table_name); } -Status NewOlapScanNode::_init_scanners(std::list* scanners) { +Status NewOlapScanNode::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { _eos = true; return Status::OK(); @@ -506,16 +506,13 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { const std::vector& key_ranges, const std::vector& rs_readers, const std::vector>& rs_reader_seg_offsets) { - NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner, - _olap_scan_node.is_preaggregation, scan_range, - key_ranges, rs_readers, rs_reader_seg_offsets, - _need_agg_finalize, _scanner_profile.get()); + std::shared_ptr scanner = NewOlapScanner::create_shared( + _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, + key_ranges, rs_readers, rs_reader_seg_offsets, _need_agg_finalize, + _scanner_profile.get()); scanner->set_compound_filters(_compound_filters); - // add scanner to pool before doing prepare. - // so that scanner can be automatically deconstructed if prepare failed. - _scanner_pool.add(scanner); - scanners->push_back((VScanner*)scanner); + scanners->push_back(scanner); return Status::OK(); }; if (is_duplicate_key) { diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 441fd4a3ae..146639465e 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -86,7 +86,7 @@ protected: bool _should_push_down_common_expr() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; private: Status _build_key_ranges_and_filters(); diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 5eebf5d27c..29178c1285 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -26,6 +26,7 @@ #include #include +#include "common/factory_creator.h" #include "common/status.h" #include "olap/data_dir.h" #include "olap/reader.h" @@ -49,6 +50,8 @@ struct FilterPredicates; class Block; class NewOlapScanner : public VScanner { + ENABLE_FACTORY_CREATOR(NewOlapScanner); + public: NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, const TPaloScanRange& scan_range, const std::vector& key_ranges, diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index e043582c9d..f6a9504fc3 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -24,11 +24,13 @@ namespace doris { namespace pipeline { class PipScannerContext : public vectorized::ScannerContext { + ENABLE_FACTORY_CREATOR(PipScannerContext); + public: PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent, const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit, + const std::list& scanners, int64_t limit, int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids) : vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc, scanners, limit, max_bytes_in_blocks_queue), diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 99fee6fd24..d5c200581b 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -44,7 +44,7 @@ namespace doris::vectorized { ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* input_tuple_desc, const doris::TupleDescriptor* output_tuple_desc, - const std::list& scanners_, int64_t limit_, + const std::list& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_) : _state(state_), _parent(parent), @@ -242,7 +242,7 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; } // Only unfinished scanners here - for (auto scanner : _scanners) { + for (auto& scanner : _scanners) { // Scanners are in ObjPool in ScanNode, // so no need to delete them here. // Add per scanner running time before close them @@ -257,7 +257,7 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* node->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); } // Only unfinished scanners here - for (auto scanner : _scanners) { + for (auto& scanner : _scanners) { scanner->close(state); // Scanners are in ObjPool in ScanNode, // so no need to delete them here. @@ -314,7 +314,7 @@ void ScannerContext::reschedule_scanner_ctx() { } } -void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { +void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { { std::unique_lock l(_scanners_lock); _scanners.push_front(scanner); @@ -344,7 +344,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { _ctx_finish_cv.notify_one(); } -void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { +void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { // 1. Calculate how many scanners should be scheduled at this run. int thread_slot_num = 0; { @@ -364,7 +364,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current_ru { std::unique_lock l(_scanners_lock); for (int i = 0; i < thread_slot_num && !_scanners.empty();) { - auto scanner = _scanners.front(); + VScannerSPtr scanner = _scanners.front(); _scanners.pop_front(); if (scanner->need_to_close()) { _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 05942ad73b..0b51758399 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -27,10 +27,12 @@ #include #include +#include "common/factory_creator.h" #include "common/status.h" #include "util/lock.h" #include "util/runtime_profile.h" #include "vec/core/block.h" +#include "vec/exec/scan/vscanner.h" namespace doris { @@ -53,10 +55,13 @@ class ScannerScheduler; // ScannerScheduler schedules a ScannerContext at a time, // and submits the Scanners to the scanner thread pool for data scanning. class ScannerContext { + ENABLE_FACTORY_CREATOR(ScannerContext); + public: ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc, - const TupleDescriptor* output_tuple_desc, const std::list& scanners_, - int64_t limit_, int64_t max_bytes_in_blocks_queue_); + const TupleDescriptor* output_tuple_desc, + const std::list& scanners_, int64_t limit_, + int64_t max_bytes_in_blocks_queue_); virtual ~ScannerContext() = default; Status init(); @@ -74,7 +79,7 @@ public: // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(VScanner* scanner); + void push_back_scanner_and_reschedule(VScannerSPtr scanner); bool set_status_on_error(const Status& status); @@ -110,7 +115,7 @@ public: int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } - void get_next_batch_of_scanners(std::list* current_run); + void get_next_batch_of_scanners(std::list* current_run); void clear_and_join(VScanNode* node, RuntimeState* state); @@ -223,7 +228,7 @@ protected: // and then if the scanner is not finished, will be pushed back to this list. // Not need to protect by lock, because only one scheduler thread will access to it. doris::Mutex _scanners_lock; - std::list _scanners; + std::list _scanners; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 99d16121d1..2ec21ced9c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -159,7 +159,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { return; } - std::list this_run; + std::list this_run; ctx->get_next_batch_of_scanners(&this_run); if (this_run.empty()) { // There will be 2 cases when this_run is empty: @@ -260,7 +260,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { } void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, - VScanner* scanner) { + VScannerSPtr scanner) { auto tracker_config = [&] { SCOPED_ATTACH_TASK(scanner->runtime_state()); Thread::set_self_name("_scanner_scan"); @@ -334,8 +334,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. // Will remove this after file reader refactor. - if (!status.ok() && (typeid(*scanner) != typeid(doris::vectorized::VFileScanner) || - (typeid(*scanner) == typeid(doris::vectorized::VFileScanner) && + if (!status.ok() && (scanner->get_name() != doris::vectorized::VFileScanner::NAME || + (scanner->get_name() == doris::vectorized::VFileScanner::NAME && !status.is()))) { LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); break; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 6f03ec5ea1..d8f51d4ad0 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "util/threadpool.h" +#include "vec/exec/scan/vscanner.h" namespace doris { class ExecEnv; @@ -73,7 +74,7 @@ private: // schedule scanners in a certain ScannerContext void _schedule_scanners(ScannerContext* ctx); // execution thread function - void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner); + void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner); private: // Scheduling queue number. diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index ca42e9c1a6..fc1da0ec6c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -26,6 +26,7 @@ #include #include +#include "common/factory_creator.h" #include "common/global_types.h" #include "common/status.h" #include "exec/olap_common.h" @@ -57,7 +58,11 @@ namespace doris::vectorized { class NewFileScanNode; class VFileScanner : public VScanner { + ENABLE_FACTORY_CREATOR(VFileScanner); + public: + static constexpr const char* NAME = "VFileScanner"; + VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile, ShardedKVCache* kv_cache); @@ -66,11 +71,12 @@ public: Status close(RuntimeState* state) override; -public: Status prepare(VExprContext** vconjunct_ctx_ptr, std::unordered_map* colname_to_value_range, const std::unordered_map* colname_to_slot_id); + std::string get_name() override { return VFileScanner::NAME; } + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp index b3744311ac..deb4980f23 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.cpp +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -58,16 +58,15 @@ Status VMetaScanNode::_init_profile() { return Status::OK(); } -Status VMetaScanNode::_init_scanners(std::list* scanners) { +Status VMetaScanNode::_init_scanners(std::list* scanners) { if (_eos == true) { return Status::OK(); } for (auto& scan_range : _scan_ranges) { - VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id, scan_range, - _limit_per_scanner, runtime_profile()); - _scanner_pool.add(scanner); + std::shared_ptr scanner = VMetaScanner::create_shared( + _state, this, _tuple_id, scan_range, _limit_per_scanner, runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); - scanners->push_back(static_cast(scanner)); + scanners->push_back(scanner); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h b/be/src/vec/exec/scan/vmeta_scan_node.h index 8f8554883b..b432d74760 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.h +++ b/be/src/vec/exec/scan/vmeta_scan_node.h @@ -51,7 +51,7 @@ public: private: Status _init_profile() override; - Status _init_scanners(std::list* scanners) override; + Status _init_scanners(std::list* scanners) override; Status _process_conjuncts() override; TupleId _tuple_id; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index c875d90b3c..e1de191342 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -22,6 +22,7 @@ #include +#include "common/factory_creator.h" #include "common/global_types.h" #include "common/status.h" #include "vec/data_types/data_type.h" @@ -46,6 +47,8 @@ class VMetaScanNode; namespace doris::vectorized { class VMetaScanner : public VScanner { + ENABLE_FACTORY_CREATOR(VMetaScanner); + public: VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id, const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index d13d4fc9c8..5482df2e20 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -295,15 +295,15 @@ Status VScanNode::_init_profile() { return Status::OK(); } -Status VScanNode::_start_scanners(const std::list& scanners) { +Status VScanNode::_start_scanners(const std::list& scanners) { if (_is_pipeline_scan) { - _scanner_ctx.reset(new pipeline::PipScannerContext( + _scanner_ctx = pipeline::PipScannerContext::create_shared( _state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), - _state->query_options().mem_limit / 20, _col_distribute_ids)); + _state->query_options().mem_limit / 20, _col_distribute_ids); } else { - _scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc, - scanners, limit(), - _state->query_options().mem_limit / 20)); + _scanner_ctx = ScannerContext::create_shared(_state, this, _input_tuple_desc, + _output_tuple_desc, scanners, limit(), + _state->query_options().mem_limit / 20); } RETURN_IF_ERROR(_scanner_ctx->init()); return Status::OK(); @@ -470,7 +470,6 @@ void VScanNode::release_resource(RuntimeState* state) { if (_common_vexpr_ctxs_pushdown) { (*_common_vexpr_ctxs_pushdown)->close(state); } - _scanner_pool.clear(); ExecNode::release_resource(state); } @@ -1408,7 +1407,7 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* } Status VScanNode::_prepare_scanners() { - std::list scanners; + std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); if (scanners.empty()) { _eos = true; diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 231ce9fb19..13361bdde8 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -44,6 +44,7 @@ #include "util/lock.h" #include "util/runtime_profile.h" #include "vec/exec/scan/scanner_context.h" +#include "vec/exec/scan/vscanner.h" #include "vec/runtime/shared_scanner_controller.h" namespace doris { @@ -197,7 +198,7 @@ protected: // predicate conditions, and scheduling strategy. // So this method needs to be implemented separately by the subclass of ScanNode. // Finally, a set of scanners that have been prepared are returned. - virtual Status _init_scanners(std::list* scanners) { return Status::OK(); } + virtual Status _init_scanners(std::list* scanners) { return Status::OK(); } // Different data sources can implement the following 3 methods to determine whether a predicate // can be pushed down to the data source. @@ -276,8 +277,6 @@ protected: // Each scan node will generates a ScannerContext to manage all Scanners. // See comments of ScannerContext for more details std::shared_ptr _scanner_ctx; - // Save all scanner objects. - ObjectPool _scanner_pool; // indicate this scan node has no more data to return bool _eos = false; @@ -445,7 +444,7 @@ private: const std::string& fn_name, int slot_ref_child = -1); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners); + Status _start_scanners(const std::list& scanners); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 1733ee3acf..fe9744ba65 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -64,6 +64,8 @@ public: virtual Status close(RuntimeState* state); + virtual std::string get_name() { return ""; } + protected: // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; @@ -204,4 +206,6 @@ protected: int64_t _per_scanner_timer = 0; }; +using VScannerSPtr = std::shared_ptr; + } // namespace doris::vectorized