diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index c4ba3d611c..d884845ab1 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -55,7 +55,6 @@ Status NewOlapScanNode::prepare(RuntimeState* state) { Status NewOlapScanNode::_init_profile() { RETURN_IF_ERROR(VScanNode::_init_profile()); - _num_disks_accessed_counter = ADD_COUNTER(_runtime_profile, "NumDiskAccess", TUnit::UNIT); _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT); // 1. init segment profile @@ -405,11 +404,10 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } // ranges constructed from scan keys - std::vector> cond_ranges; - RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges)); + RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges)); // if we can't get ranges from conditions, we give it a total range - if (cond_ranges.empty()) { - cond_ranges.emplace_back(new doris::OlapScanRange()); + if (_cond_ranges.empty()) { + _cond_ranges.emplace_back(new doris::OlapScanRange()); } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); @@ -462,25 +460,20 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } } - std::unordered_set disk_set; auto build_new_scanner = [&](const TPaloScanRange& scan_range, const std::vector& key_ranges, const std::vector& rs_readers, const std::vector>& rs_reader_seg_offsets) { NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner, - _olap_scan_node.is_preaggregation, + _olap_scan_node.is_preaggregation, scan_range, + key_ranges, rs_readers, rs_reader_seg_offsets, _need_agg_finalize, _scanner_profile.get()); scanner->set_compound_filters(_compound_filters); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, _vconjunct_ctx_ptr.get(), - _olap_filters, _filter_predicates, _push_down_functions, - _common_vexpr_ctxs_pushdown.get(), rs_readers, - rs_reader_seg_offsets)); scanners->push_back((VScanner*)scanner); - disk_set.insert(scanner->scan_disk()); return Status::OK(); }; if (is_duplicate_key) { @@ -489,7 +482,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1); for (int i = 0; i < _scan_ranges.size(); ++i) { auto& scan_range = _scan_ranges[i]; - std::vector>* ranges = &cond_ranges; + std::vector>* ranges = &_cond_ranges; int num_ranges = ranges->size(); std::vector scanner_ranges(num_ranges); for (int j = 0; j < num_ranges; ++j) { @@ -553,7 +546,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { true); RETURN_IF_ERROR(status); - std::vector>* ranges = &cond_ranges; + std::vector>* ranges = &_cond_ranges; int size_based_scanners_per_tablet = 1; if (config::doris_scan_range_max_mb > 0) { @@ -577,10 +570,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {})); } } - COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); } - // telemetry::set_span_attribute(span, _num_disks_accessed_counter); - // telemetry::set_span_attribute(span, _num_scanners); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index b1c95804b7..48fde8436b 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -65,6 +65,7 @@ private: private: TOlapScanNode _olap_scan_node; std::vector> _scan_ranges; + std::vector> _cond_ranges; OlapScanKeys _scan_keys; std::vector _olap_filters; // _compound_filters store conditions in the one compound relationship in conjunct expr tree except leaf node of `and` node, diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 3aaaf6add4..59b72eea8b 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -24,12 +24,21 @@ namespace doris::vectorized { NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, - bool aggregation, bool need_agg_finalize, RuntimeProfile* profile) + bool aggregation, const TPaloScanRange& scan_range, + const std::vector& key_ranges, + const std::vector& rs_readers, + const std::vector>& rs_reader_seg_offsets, + bool need_agg_finalize, RuntimeProfile* profile) : VScanner(state, static_cast(parent), limit, profile), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1) { + _version(-1), + _scan_range(scan_range), + _key_ranges(key_ranges) { + _tablet_reader_params.rs_readers = rs_readers; + _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets; _tablet_schema = std::make_shared(); + _is_init = false; } static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, @@ -46,19 +55,14 @@ static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, return read_columns_string; } -Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, - const std::vector& key_ranges, - VExprContext** vconjunct_ctx_ptr, - const std::vector& filters, - const FilterPredicates& filter_predicates, - const std::vector& function_filters, - VExprContext** common_vexpr_ctxs_pushdown, - const std::vector& rs_readers, - const std::vector>& rs_reader_seg_offsets) { - RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); - if (common_vexpr_ctxs_pushdown != nullptr) { +Status NewOlapScanner::init() { + _is_init = true; + auto parent = static_cast(_parent); + RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr.get())); + if (parent->_common_vexpr_ctxs_pushdown != nullptr) { // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary. - RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, &_common_vexpr_ctxs_pushdown)); + RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown) + ->clone(_state, &_common_vexpr_ctxs_pushdown)); } // set limit to reduce end of rowset and segment mem use @@ -69,8 +73,8 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, : std::min(static_cast(_state->batch_size()), _parent->limit())); // Get olap table - TTabletId tablet_id = scan_range.tablet_id; - _version = strtoul(scan_range.version.c_str(), nullptr, 10); + TTabletId tablet_id = _scan_range.tablet_id; + _version = strtoul(_scan_range.version.c_str(), nullptr, 10); { auto [tablet, status] = StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, true); @@ -114,7 +118,7 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, { std::shared_lock rdlock(_tablet->get_header_lock()); - if (rs_readers.empty()) { + if (_tablet_reader_params.rs_readers.empty()) { const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); if (rowset == nullptr) { std::stringstream ss; @@ -137,14 +141,12 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, << ", backend=" << BackendOptions::get_localhost(); return Status::InternalError(ss.str()); } - } else { - _tablet_reader_params.rs_readers = rs_readers; - _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets; } // Initialize tablet_reader_params - RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, filter_predicates, - function_filters)); + RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges, parent->_olap_filters, + parent->_filter_predicates, + parent->_push_down_functions)); } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 2a04e021db..4b05880bcd 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -35,20 +35,17 @@ struct FilterPredicates; class NewOlapScanner : public VScanner { public: NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, + const TPaloScanRange& scan_range, const std::vector& key_ranges, + const std::vector& rs_readers, + const std::vector>& rs_reader_seg_offsets, bool need_agg_finalize, RuntimeProfile* profile); + Status init() override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status prepare(const TPaloScanRange& scan_range, const std::vector& key_ranges, - VExprContext** vconjunct_ctx_ptr, const std::vector& filters, - const FilterPredicates& filter_predicates, - const std::vector& function_filters, - VExprContext** common_vexpr_ctxs_pushdown, - const std::vector& rs_readers = {}, - const std::vector>& rs_reader_seg_offsets = {}); - const std::string& scan_disk() const { return _tablet->data_dir()->path(); } void set_compound_filters(const std::vector& compound_filters); @@ -75,6 +72,8 @@ private: TabletSchemaSPtr _tablet_schema; TabletSharedPtr _tablet; int64_t _version; + const TPaloScanRange& _scan_range; + std::vector _key_ranges; TabletReader::ReaderParams _tablet_reader_params; std::unique_ptr _tablet_reader; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 38121538f7..a6a76e57d7 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -260,7 +260,14 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool eos = false; RuntimeState* state = ctx->state(); DCHECK(nullptr != state); - if (!scanner->is_open()) { + if (!scanner->is_init()) { + status = scanner->init(); + if (!status.ok()) { + ctx->set_status_on_error(status); + eos = true; + } + } + if (!eos && !scanner->is_open()) { status = scanner->open(state); if (!status.ok()) { ctx->set_status_on_error(status); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 3e9459071b..1286baa573 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -1363,7 +1363,6 @@ Status VScanNode::_prepare_scanners() { COUNTER_SET(_num_scanners, static_cast(scanners.size())); RETURN_IF_ERROR(_start_scanners(scanners)); } - return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 88cac3db42..26248d9b54 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -39,7 +39,9 @@ class VScanner { public: VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile); - virtual ~VScanner() {} + virtual ~VScanner() = default; + + virtual Status init() { return Status::OK(); } virtual Status open(RuntimeState* state) { return Status::OK(); } @@ -67,6 +69,8 @@ public: int64_t get_rows_read() const { return _num_rows_read; } + bool is_init() const { return _is_init; } + Status try_append_late_arrival_runtime_filter(); // Call start_wait_worker_timer() when submit the scanner to the thread pool. @@ -179,6 +183,8 @@ protected: // set to true after decrease the "_num_unfinished_scanners" in scanner context bool _is_counted_down = false; + bool _is_init = true; + ScannerCounter _counter; int64_t _per_scanner_timer = 0; };