From a16cf0e2c8d810f87259a7bfeac0c5c7b38109ee Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 30 Aug 2022 10:55:48 +0800 Subject: [PATCH] [feature-wip](scan) add profile for new olap scan node (#12042) Copy most of profiles from VOlapScanNode and VOlapScanner to NewOlapScanNode and NewOlapScanner. Fix some blocking bug of new scan framework. TODO: Memtracker Opentelemetry spen The new framework is still disabled by default, so it will not effect other feature. --- be/src/exec/exec_node.cpp | 17 +-- be/src/exec/exec_node.h | 5 + be/src/runtime/plan_fragment_executor.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 2 +- be/src/runtime/runtime_state.h | 2 + be/src/util/runtime_profile.h | 1 + be/src/vec/exec/scan/new_olap_scan_node.cpp | 109 ++++++++++++++++++-- be/src/vec/exec/scan/new_olap_scan_node.h | 67 ++++++++++++ be/src/vec/exec/scan/new_olap_scanner.cpp | 101 ++++++++++++++++-- be/src/vec/exec/scan/new_olap_scanner.h | 9 +- be/src/vec/exec/scan/scanner_context.cpp | 27 ++++- be/src/vec/exec/scan/scanner_context.h | 15 ++- be/src/vec/exec/scan/scanner_scheduler.cpp | 22 ++-- be/src/vec/exec/scan/vscan_node.cpp | 53 +++++++++- be/src/vec/exec/scan/vscan_node.h | 36 ++++++- be/src/vec/exec/scan/vscanner.cpp | 39 ++++--- be/src/vec/exec/scan/vscanner.h | 26 ++++- be/src/vec/exec/volap_scan_node.cpp | 5 + be/src/vec/exec/volap_scan_node.h | 2 + be/src/vec/exprs/vexpr_context.cpp | 1 + 20 files changed, 475 insertions(+), 66 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 88cbb8cdf1..0b643135b0 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -180,13 +180,12 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list* } Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { - std::string profile; - if (state && state->enable_vectorized_exec()) { - profile = "V" + print_plan_node_type(tnode.node_type); - } else { - profile = print_plan_node_type(tnode.node_type); - } - init_runtime_profile(profile); +#ifdef BE_TEST + _is_vec = true; +#else + _is_vec = state->enable_vectorized_exec(); +#endif + init_runtime_profile(get_name()); if (tnode.__isset.vconjunct) { _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); @@ -766,4 +765,8 @@ Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* e return Status::NotSupported("Not Implemented get block"); } +std::string ExecNode::get_name() { + return (_is_vec ? "V" : "") + print_plan_node_type(_type); +} + } // namespace doris diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 6695faf7e7..5b700090b2 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -192,6 +192,8 @@ public: OpentelemetrySpan get_next_span() { return _get_next_span; } + virtual std::string get_name(); + // Extract node id from p->name(). static int get_node_id_from_profile(RuntimeProfile* p); @@ -311,6 +313,9 @@ protected: /// reservations pool in Close(). BufferPool::ClientHandle _buffer_pool_client; + // Set to true if this is a vectorized exec node. + bool _is_vec = false; + ExecNode* child(int i) { return _children[i]; } bool is_closed() const { return _is_closed; } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index ff3a0feed9..9346b947d3 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -61,7 +61,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _done(false), _prepared(false), _closed(false), - _is_report_success(true), + _is_report_success(false), _is_report_on_cancel(true), _collect_query_statistics_with_every_batch(false), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR), diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 0134886779..34c9fb33e3 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -92,7 +92,7 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt } else { filter_map = &_producer_map; } - // LOG(INFO) << "regist filter...:" << key << ",role:" << role; + VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role; auto iter = filter_map->find(key); if (iter != filter_map->end()) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 484e7d971b..9f2cff712c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -375,6 +375,8 @@ public: void set_tracer(OpentelemetryTracer&& tracer) { _tracer = std::move(tracer); } + bool enable_profile() const { return _query_options.is_report_success; } + private: // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const std::shared_ptr& block_mgr) { diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index ad3424b326..573aecf55e 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -556,6 +556,7 @@ public: // Update counter when object is destroyed ~ScopedTimer() { + if (_counter == nullptr) return; _sw.stop(); UpdateCounter(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index bd48e288ec..2c0876c5ff 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -19,6 +19,7 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "util/to_string.h" #include "vec/columns/column_const.h" #include "vec/exec/scan/new_olap_scanner.h" #include "vec/functions/in.h" @@ -41,9 +42,96 @@ Status NewOlapScanNode::prepare(RuntimeState* state) { } Status NewOlapScanNode::_init_profile() { + VScanNode::_init_profile(); + + _num_disks_accessed_counter = ADD_COUNTER(_runtime_profile, "NumDiskAccess", TUnit::UNIT); + _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT); + + // 1. init segment profile + _segment_profile.reset(new RuntimeProfile("SegmentIterator")); + _scanner_profile->add_child(_segment_profile.get(), true, nullptr); + + // 2. init timer and counters + _reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime"); + _read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES); + _read_uncompressed_counter = + ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES); + _block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime"); + _block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT); + _block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime"); + _raw_rows_counter = ADD_COUNTER(_segment_profile, "RawRowsRead", TUnit::UNIT); + _block_convert_timer = ADD_TIMER(_scanner_profile, "BlockConvertTime"); + _block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime"); + _block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime"); + _block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT); + + _rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT); + _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); + _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); + _first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime"); + _first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime"); + _first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT); + + _lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime"); + _lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime"); + _lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT); + + _output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime"); + + _stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT); + _bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT); + _del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT); + _conditions_filtered_counter = + ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT); + _key_range_filtered_counter = + ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT); + + _io_timer = ADD_TIMER(_segment_profile, "IOTimer"); + _decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer"); + + _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); + _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); + + _bitmap_index_filter_counter = + ADD_COUNTER(_segment_profile, "RowsBitmapIndexFiltered", TUnit::UNIT); + _bitmap_index_filter_timer = ADD_TIMER(_segment_profile, "BitmapIndexFilterTimer"); + + _filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT); + _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); + + // for the purpose of debugging or profiling + for (int i = 0; i < GENERAL_DEBUG_COUNT; ++i) { + char name[64]; + snprintf(name, sizeof(name), "GeneralDebugTimer%d", i); + _general_debug_timer[i] = ADD_TIMER(_segment_profile, name); + } return Status::OK(); } +static std::string olap_filter_to_string(const doris::TCondition& condition) { + auto op_name = condition.condition_op; + if (condition.condition_op == "*=") { + op_name = "IN"; + } else if (condition.condition_op == "!*=") { + op_name = "NOT IN"; + } + return fmt::format("{{{} {} {}}}", condition.column_name, op_name, + to_string(condition.condition_values)); +} + +static std::string olap_filters_to_string(const std::vector& filters) { + std::string filters_string; + filters_string += "["; + for (auto it = filters.cbegin(); it != filters.cend(); it++) { + if (it != filters.cbegin()) { + filters_string += ","; + } + filters_string += olap_filter_to_string(*it); + } + filters_string += "]"; + return filters_string; +} + Status NewOlapScanNode::_process_conjuncts() { RETURN_IF_ERROR(VScanNode::_process_conjuncts()); if (_eos) { @@ -92,8 +180,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { } } - // _runtime_profile->add_info_string("PushDownPredicate", olap_filters_to_string(_olap_filters)); - // _runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string()); + _runtime_profile->add_info_string("PushDownPredicates", olap_filters_to_string(_olap_filters)); + _runtime_profile->add_info_string("KeyRanges", _scan_keys.debug_string()); VLOG_CRITICAL << _scan_keys.debug_string(); return Status::OK(); @@ -205,13 +293,17 @@ void NewOlapScanNode::set_scan_ranges(const std::vector& scan_ for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); - // COUNTER_UPDATE(_tablet_counter, 1); + COUNTER_UPDATE(_tablet_counter, 1); } // telemetry::set_current_span_attribute(_tablet_counter); return; } +std::string NewOlapScanNode::get_name() { + return fmt::format("VNewOlapScanNode({0})", _olap_scan_node.table_name); +} + Status NewOlapScanNode::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { _eos = true; @@ -228,7 +320,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); - // std::unordered_set disk_set; + std::unordered_set disk_set; for (auto& scan_range : _scan_ranges) { auto tablet_id = scan_range->tablet_id; std::string err; @@ -273,19 +365,14 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { _olap_filters, _bloom_filters_push_down, _push_down_functions)); scanners->push_back((VScanner*)scanner); - // disk_set.insert(scanner->scan_disk()); + disk_set.insert(scanner->scan_disk()); } } - // COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); - // COUNTER_SET(_num_scanners, static_cast(_volap_scanners.size())); + COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); // telemetry::set_span_attribute(span, _num_disks_accessed_counter); // telemetry::set_span_attribute(span, _num_scanners); - // init progress - // std::stringstream ss; - // ss << "ScanThread complete (node=" << id() << "):"; - // _progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 508c8851d5..a721737585 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -31,6 +31,8 @@ public: void set_scan_ranges(const std::vector& scan_ranges) override; + std::string get_name() override; + protected: Status _init_profile() override; Status _process_conjuncts() override; @@ -59,6 +61,71 @@ private: OlapScanKeys _scan_keys; std::unique_ptr _scanner_mem_tracker; + +private: + std::unique_ptr _segment_profile; + + RuntimeProfile::Counter* _num_disks_accessed_counter = nullptr; + + RuntimeProfile::Counter* _tablet_counter = nullptr; + RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr; + RuntimeProfile::Counter* _reader_init_timer = nullptr; + + RuntimeProfile::Counter* _io_timer = nullptr; + RuntimeProfile::Counter* _read_compressed_counter = nullptr; + RuntimeProfile::Counter* _decompressor_timer = nullptr; + RuntimeProfile::Counter* _read_uncompressed_counter = nullptr; + RuntimeProfile::Counter* _raw_rows_counter = nullptr; + + RuntimeProfile::Counter* _rows_vec_cond_counter = nullptr; + RuntimeProfile::Counter* _vec_cond_timer = nullptr; + RuntimeProfile::Counter* _short_cond_timer = nullptr; + RuntimeProfile::Counter* _output_col_timer = nullptr; + + RuntimeProfile::Counter* _stats_filtered_counter = nullptr; + RuntimeProfile::Counter* _bf_filtered_counter = nullptr; + RuntimeProfile::Counter* _del_filtered_counter = nullptr; + RuntimeProfile::Counter* _conditions_filtered_counter = nullptr; + RuntimeProfile::Counter* _key_range_filtered_counter = nullptr; + + RuntimeProfile::Counter* _block_fetch_timer = nullptr; + RuntimeProfile::Counter* _block_load_timer = nullptr; + RuntimeProfile::Counter* _block_load_counter = nullptr; + // Add more detail seek timer and counter profile + // Read process is split into 3 stages: init, first read, lazy read + RuntimeProfile::Counter* _block_init_timer = nullptr; + RuntimeProfile::Counter* _block_init_seek_timer = nullptr; + RuntimeProfile::Counter* _block_init_seek_counter = nullptr; + RuntimeProfile::Counter* _first_read_timer = nullptr; + RuntimeProfile::Counter* _first_read_seek_timer = nullptr; + RuntimeProfile::Counter* _first_read_seek_counter = nullptr; + RuntimeProfile::Counter* _lazy_read_timer = nullptr; + RuntimeProfile::Counter* _lazy_read_seek_timer = nullptr; + RuntimeProfile::Counter* _lazy_read_seek_counter = nullptr; + + RuntimeProfile::Counter* _block_convert_timer = nullptr; + + // total pages read + // used by segment v2 + RuntimeProfile::Counter* _total_pages_num_counter = nullptr; + // page read from cache + // used by segment v2 + RuntimeProfile::Counter* _cached_pages_num_counter = nullptr; + + // row count filtered by bitmap inverted index + RuntimeProfile::Counter* _bitmap_index_filter_counter = nullptr; + // time fro bitmap inverted index read and filter + RuntimeProfile::Counter* _bitmap_index_filter_timer = nullptr; + // number of created olap scanners + RuntimeProfile::Counter* _num_scanners = nullptr; + + // number of segment filtered by column stat when creating seg iterator + RuntimeProfile::Counter* _filtered_segment_counter = nullptr; + // total number of segment related to this scan node + RuntimeProfile::Counter* _total_segment_counter = nullptr; + + // for debugging or profiling, record any info as you want + RuntimeProfile::Counter* _general_debug_timer[GENERAL_DEBUG_COUNT] = {}; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index b99ce8281d..00e9b2bc34 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -114,11 +114,8 @@ Status NewOlapScanner::prepare( Status NewOlapScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); - // SCOPED_TIMER(_parent->_reader_init_timer); // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); - auto res = _tablet_reader->init(_tablet_reader_params); if (!res.ok()) { std::stringstream ss; @@ -177,6 +174,12 @@ Status NewOlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.delete_predicates, _tablet_reader_params.delete_predicates.begin())); + // Merge the columns in delete predicate that not in latest schema in to current tablet schema + for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) { + _tablet_schema->merge_dropped_columns( + _tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + } + // Range for (auto key_range : key_ranges) { if (key_range->begin_scan_range.size() == 1 && @@ -293,6 +296,7 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* if (block->rows() > 0) { *eof = false; } + _update_realtime_counters(); return Status::OK(); } @@ -308,18 +312,97 @@ Status NewOlapScanner::close(RuntimeState* state) { // deconstructor in reader references runtime state // so that it will core _tablet_reader_params.rs_readers.clear(); - _update_counter(); _tablet_reader.reset(); - // Expr::close(_conjunct_ctxs, state); - _is_closed = true; - return Status::OK(); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); } -void NewOlapScanner::_update_counter() { - // TODO +void NewOlapScanner::_update_realtime_counters() { + NewOlapScanNode* olap_parent = (NewOlapScanNode*)_parent; + auto& stats = _tablet_reader->stats(); + COUNTER_UPDATE(olap_parent->_read_compressed_counter, stats.compressed_bytes_read); + _compressed_bytes_read += stats.compressed_bytes_read; + _tablet_reader->mutable_stats()->compressed_bytes_read = 0; + + COUNTER_UPDATE(olap_parent->_raw_rows_counter, stats.raw_rows_read); + // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash + _raw_rows_read += stats.raw_rows_read; + _tablet_reader->mutable_stats()->raw_rows_read = 0; +} + +void NewOlapScanner::_update_counters_before_close() { + if (!_state->enable_profile()) return; + + if (_has_updated_counter) { + return; + } + _has_updated_counter = true; + + VScanner::_update_counters_before_close(); + + // Update counters for NewOlapScanner + NewOlapScanNode* olap_parent = (NewOlapScanNode*)_parent; + + // Update counters from tablet reader's stats + auto& stats = _tablet_reader->stats(); + COUNTER_UPDATE(olap_parent->_io_timer, stats.io_ns); + COUNTER_UPDATE(olap_parent->_read_compressed_counter, stats.compressed_bytes_read); + _compressed_bytes_read += stats.compressed_bytes_read; + COUNTER_UPDATE(olap_parent->_decompressor_timer, stats.decompress_ns); + COUNTER_UPDATE(olap_parent->_read_uncompressed_counter, stats.uncompressed_bytes_read); + + COUNTER_UPDATE(olap_parent->_block_load_timer, stats.block_load_ns); + COUNTER_UPDATE(olap_parent->_block_load_counter, stats.blocks_load); + COUNTER_UPDATE(olap_parent->_block_fetch_timer, stats.block_fetch_ns); + COUNTER_UPDATE(olap_parent->_block_convert_timer, stats.block_convert_ns); + + COUNTER_UPDATE(olap_parent->_raw_rows_counter, stats.raw_rows_read); + // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash + _raw_rows_read += _tablet_reader->mutable_stats()->raw_rows_read; + COUNTER_UPDATE(olap_parent->_vec_cond_timer, stats.vec_cond_ns); + COUNTER_UPDATE(olap_parent->_short_cond_timer, stats.short_cond_ns); + COUNTER_UPDATE(olap_parent->_block_init_timer, stats.block_init_ns); + COUNTER_UPDATE(olap_parent->_block_init_seek_timer, stats.block_init_seek_ns); + COUNTER_UPDATE(olap_parent->_block_init_seek_counter, stats.block_init_seek_num); + COUNTER_UPDATE(olap_parent->_first_read_timer, stats.first_read_ns); + COUNTER_UPDATE(olap_parent->_first_read_seek_timer, stats.block_first_read_seek_ns); + COUNTER_UPDATE(olap_parent->_first_read_seek_counter, stats.block_first_read_seek_num); + COUNTER_UPDATE(olap_parent->_lazy_read_timer, stats.lazy_read_ns); + COUNTER_UPDATE(olap_parent->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns); + COUNTER_UPDATE(olap_parent->_lazy_read_seek_counter, stats.block_lazy_read_seek_num); + COUNTER_UPDATE(olap_parent->_output_col_timer, stats.output_col_ns); + COUNTER_UPDATE(olap_parent->_rows_vec_cond_counter, stats.rows_vec_cond_filtered); + + COUNTER_UPDATE(olap_parent->_stats_filtered_counter, stats.rows_stats_filtered); + COUNTER_UPDATE(olap_parent->_bf_filtered_counter, stats.rows_bf_filtered); + COUNTER_UPDATE(olap_parent->_del_filtered_counter, stats.rows_del_filtered); + COUNTER_UPDATE(olap_parent->_del_filtered_counter, stats.rows_del_by_bitmap); + COUNTER_UPDATE(olap_parent->_del_filtered_counter, stats.rows_vec_del_cond_filtered); + + COUNTER_UPDATE(olap_parent->_conditions_filtered_counter, stats.rows_conditions_filtered); + COUNTER_UPDATE(olap_parent->_key_range_filtered_counter, stats.rows_key_range_filtered); + + size_t timer_count = sizeof(stats.general_debug_ns) / sizeof(*stats.general_debug_ns); + for (size_t i = 0; i < timer_count; ++i) { + COUNTER_UPDATE(olap_parent->_general_debug_timer[i], stats.general_debug_ns[i]); + } + + COUNTER_UPDATE(olap_parent->_total_pages_num_counter, stats.total_pages_num); + COUNTER_UPDATE(olap_parent->_cached_pages_num_counter, stats.cached_pages_num); + + COUNTER_UPDATE(olap_parent->_bitmap_index_filter_counter, stats.rows_bitmap_index_filtered); + COUNTER_UPDATE(olap_parent->_bitmap_index_filter_timer, stats.bitmap_index_filter_timer); + + COUNTER_UPDATE(olap_parent->_filtered_segment_counter, stats.filtered_segment_number); + COUNTER_UPDATE(olap_parent->_total_segment_counter, stats.total_segment_number); + + // Update metrics + DorisMetrics::instance()->query_scan_bytes->increment(_compressed_bytes_read); + DorisMetrics::instance()->query_scan_rows->increment(_raw_rows_read); + _tablet->query_scan_bytes->increment(_compressed_bytes_read); + _tablet->query_scan_rows->increment(_raw_rows_read); + _tablet->query_scan_count->increment(1); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 73c0330b20..7e784d69fc 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -47,11 +47,14 @@ public: bloom_filters, const std::vector& function_filters); + const std::string& scan_disk() const { return _tablet->data_dir()->path(); } + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + void _update_counters_before_close() override; private: - void _update_counter(); + void _update_realtime_counters(); Status _init_tablet_reader_params( const std::vector& key_ranges, const std::vector& filters, @@ -74,6 +77,10 @@ private: std::vector _return_columns; std::unordered_set _tablet_columns_convert_to_null_set; + + // ========= profiles ========== + int64_t _compressed_bytes_read = 0; + int64_t _raw_rows_read = 0; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index bef4fe8e66..cfe15a0ab1 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -22,6 +22,7 @@ #include "util/threadpool.h" #include "vec/core/block.h" #include "vec/exec/scan/scanner_scheduler.h" +#include "vec/exec/scan/vscan_node.h" #include "vec/exec/scan/vscanner.h" namespace doris::vectorized { @@ -69,6 +70,9 @@ Status ScannerContext::init() { _num_unfinished_scanners = _scanners.size(); + COUNTER_SET(_parent->_pre_alloc_free_blocks_num, (int64_t)_free_blocks.size()); + COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num); + return Status::OK(); } @@ -83,6 +87,7 @@ vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { } *get_free_block = false; + COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1); return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size()); } @@ -105,6 +110,11 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos std::unique_lock l(_transfer_lock); // Wait for block from queue while (_process_status.ok() && !_is_finished && blocks_queue.empty()) { + if (_state->is_cancelled()) { + _process_status = Status::Cancelled("cancelled"); + break; + } + SCOPED_TIMER(_parent->_scanner_wait_batch_timer); _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1)); } @@ -145,8 +155,6 @@ Status ScannerContext::_close_and_clear_scanners() { } void ScannerContext::clear_and_join() { - _close_and_clear_scanners(); - std::unique_lock l(_transfer_lock); do { if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { @@ -158,10 +166,15 @@ void ScannerContext::clear_and_join() { } } while (false); + // Must wait all running scanners stop running. + // So that we can make sure to close all scanners. + _close_and_clear_scanners(); + std::for_each(blocks_queue.begin(), blocks_queue.end(), std::default_delete()); std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete()); + return; } @@ -189,12 +202,14 @@ void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* schedule scheduler->submit(this); if (scanner->need_to_close() && (--_num_unfinished_scanners) == 0) { _is_finished = true; + COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); + COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); _blocks_queue_added_cv.notify_one(); } _ctx_finish_cv.notify_one(); } -void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { +bool ScannerContext::get_next_batch_of_scanners(std::list* current_run) { // 1. Calculate how many scanners should be scheduled at this run. int thread_slot_num = 0; { @@ -209,6 +224,11 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current_ru if (thread_slot_num <= 0) { thread_slot_num = 1; } + } else { + // The blocks queue reaches limit, so no more scanners will be scheduled this time. + // We need to return false so that the scanner scheduler will push bash this ctx + // to the scheduling queue, waiting next scheduling. + return false; } } @@ -227,6 +247,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current_ru } } } + return true; } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 10728bc7df..3e5d052412 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -34,6 +34,7 @@ class ScannerScheduler; namespace vectorized { class VScanner; +class VScanNode; // ScannerContext is responsible for recording the execution status // of a group of Scanners corresponding to a ScanNode. @@ -45,10 +46,11 @@ class VScanner; // and submits the Scanners to the scanner thread pool for data scanning. class ScannerContext { public: - ScannerContext(RuntimeState* state_, const TupleDescriptor* input_tuple_desc, + ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_) : _state(state_), + _parent(parent), _input_tuple_desc(input_tuple_desc), _output_tuple_desc(output_tuple_desc), _process_status(Status::OK()), @@ -108,7 +110,7 @@ public: _ctx_finish_cv.notify_one(); } - void get_next_batch_of_scanners(std::list* current_run); + bool get_next_batch_of_scanners(std::list* current_run); void clear_and_join(); @@ -116,6 +118,11 @@ public: RuntimeState* state() { return _state; } + void incr_num_ctx_scheduling(int64_t num) { _num_ctx_scheduling += num; } + void incr_num_scanner_scheduling(int64_t num) { _num_scanner_scheduling += num; } + + VScanNode* parent() { return _parent; } + public: // the unique id of this context std::string ctx_id; @@ -127,6 +134,7 @@ private: private: RuntimeState* _state; + VScanNode* _parent; // the comment of same fields in VScanNode const TupleDescriptor* _input_tuple_desc; @@ -199,7 +207,8 @@ private: std::mutex _scanners_lock; std::list _scanners; - // TODO: Add statistics of this scanner + int64_t _num_ctx_scheduling = 0; + int64_t _num_scanner_scheduling = 0; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7ddd5e304b..fc8ac3e701 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -91,15 +91,24 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { + ctx->incr_num_ctx_scheduling(1); if (ctx->done()) { ctx->update_num_running(0, -1); return; } std::list this_run; - ctx->get_next_batch_of_scanners(&this_run); + bool res = ctx->get_next_batch_of_scanners(&this_run); if (this_run.empty()) { - submit(ctx); + if (!res) { + // This means we failed to choose scanners this time, and there may be no other scanners running. + // So we need to submit this ctx back to queue to be scheduled next time. + submit(ctx); + } else { + // No need to push back this ctx to reschedule + // There will be running scanners to push it back. + ctx->update_num_running(0, -1); + } return; } @@ -109,15 +118,16 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { int nice = 1; auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); auto iter = this_run.begin(); + ctx->incr_num_scanner_scheduling(this_run.size()); if (ctx->thread_token != nullptr) { while (iter != this_run.end()) { + (*iter)->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func( [this, scanner = *iter, parent_span = cur_span, ctx] { opentelemetry::trace::Scope scope {parent_span}; this->_scanner_scan(this, ctx, scanner); }); if (s.ok()) { - (*iter)->start_wait_worker_timer(); this_run.erase(iter++); } else { ctx->set_status_on_error(s); @@ -157,11 +167,11 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext VScanner* scanner) { // TODO: rethink mem tracker and span // START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span, - // "ScannerScheduler::_scanner_scan"); - // SCOPED_ATTACH_TASK(scanner->runtime_state()); + // "ScannerScheduler::_scanner_scan"); + SCOPED_ATTACH_TASK(scanner->runtime_state()); Thread::set_self_name("_scanner_scan"); - // int64_t wait_time = scanner->update_wait_worker_timer(); + scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`. ThreadCpuStopWatch cpu_watch; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 26f8847c1d..ba80c4aea3 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -88,7 +88,7 @@ Status VScanNode::prepare(RuntimeState* state) { } Status VScanNode::open(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); + // START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); @@ -102,13 +102,16 @@ Status VScanNode::open(RuntimeState* state) { if (scanners.empty()) { _eos = true; } else { + COUNTER_SET(_num_scanners, static_cast(scanners.size())); RETURN_IF_ERROR(_start_scanners(scanners)); } return Status::OK(); } Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + // INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (state->is_cancelled()) { _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); return _scanner_ctx->status(); @@ -139,9 +142,42 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* return Status::OK(); } +Status VScanNode::_init_profile() { + // 1. counters for scan node + _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); + _total_throughput_counter = + runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); + _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); + + // 2. counters for scanners + _scanner_profile.reset(new RuntimeProfile("VScanner")); + runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); + + _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); + _prefilter_timer = ADD_TIMER(_scanner_profile, "ScannerPrefilterTime"); + _convert_block_timer = ADD_TIMER(_scanner_profile, "ScannerConvertBlockTime"); + _filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime"); + + _scanner_sched_counter = ADD_COUNTER(_runtime_profile, "ScannerSchedCount", TUnit::UNIT); + _scanner_ctx_sched_counter = ADD_COUNTER(_runtime_profile, "ScannerCtxSchedCount", TUnit::UNIT); + // time of transfer thread to wait for block from scan thread + _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime"); + // time of scan thread to wait for worker thread of the thread pool + _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); + + _pre_alloc_free_blocks_num = + ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum", TUnit::UNIT); + _newly_create_free_blocks_num = + ADD_COUNTER(_runtime_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); + _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); + + return Status::OK(); +} + Status VScanNode::_start_scanners(const std::list& scanners) { - _scanner_ctx.reset(new ScannerContext(_state, _input_tuple_desc, _output_tuple_desc, scanners, - limit(), _state->query_options().mem_limit / 20)); + _scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc, + scanners, limit(), + _state->query_options().mem_limit / 20)); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); return Status::OK(); @@ -189,7 +225,14 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector& vexprs) { return Status::OK(); } - auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; + VExpr* last_expr = nullptr; + if (_vconjunct_ctx_ptr) { + last_expr = (*_vconjunct_ctx_ptr)->root(); + } else { + DCHECK(_rf_vexpr_set.find(vexprs[0]) == _rf_vexpr_set.end()); + last_expr = vexprs[0]; + _rf_vexpr_set.insert(vexprs[0]); + } for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) { continue; @@ -239,7 +282,7 @@ Status VScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close"); + // START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close"); if (_scanner_ctx.get()) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case. diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 292234162e..a37074948a 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -34,8 +34,10 @@ class VSlotRef; class VScanNode : public ExecNode { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} + : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} + friend class VScanner; friend class NewOlapScanner; + friend class ScannerContext; Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; @@ -73,7 +75,7 @@ public: protected: // Different data sources register different profiles by implementing this method - virtual Status _init_profile() { return Status::OK(); } + virtual Status _init_profile(); // Process predicates, extract the predicates in the conjuncts that can be pushed down // to the data source, and convert them into common expressions structure ColumnPredicate. @@ -191,6 +193,36 @@ protected: // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; +protected: + std::unique_ptr _scanner_profile; + + // rows read from the scanner (including those discarded by (pre)filters) + RuntimeProfile::Counter* _rows_read_counter; + // Wall based aggregate read throughput [rows/sec] + RuntimeProfile::Counter* _total_throughput_counter; + RuntimeProfile::Counter* _num_scanners; + + // time of get block from scanner + RuntimeProfile::Counter* _scan_timer = nullptr; + // time of prefilter input block from scanner + RuntimeProfile::Counter* _prefilter_timer = nullptr; + // time of convert input block to output block from scanner + RuntimeProfile::Counter* _convert_block_timer = nullptr; + // time of filter output block from scanner + RuntimeProfile::Counter* _filter_timer = nullptr; + + RuntimeProfile::Counter* _scanner_sched_counter = nullptr; + RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; + RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; + RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; + + // Num of pre allocated free blocks + RuntimeProfile::Counter* _pre_alloc_free_blocks_num = nullptr; + // Num of newly created free blocks when running query + RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; + // Max num of scanner thread + RuntimeProfile::Counter* _max_scanner_thread_num = nullptr; + private: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index e154e0e6ba..40de8af22c 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -48,28 +48,36 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _init_input_block(block); { - // SCOPED_TIMER(_parent->_scan_timer); do { // 1. Get input block from scanner - RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof)); - if (*eof) { - DCHECK(_input_block_ptr->rows() == 0); - break; + { + SCOPED_TIMER(_parent->_scan_timer); + RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof)); + if (*eof) { + DCHECK(_input_block_ptr->rows() == 0); + break; + } + _num_rows_read += _input_block_ptr->rows(); } - _num_rows_read += _input_block_ptr->rows(); - // _update_realtime_counter(); // 2. For load, use prefilter to filter the input block first. - RETURN_IF_ERROR(_filter_input_block(_input_block_ptr)); + { + SCOPED_TIMER(_parent->_prefilter_timer); + RETURN_IF_ERROR(_filter_input_block(_input_block_ptr)); + } // 3. For load, convert input block to output block - RETURN_IF_ERROR(_convert_to_output_block(block)); + { + SCOPED_TIMER(_parent->_convert_block_timer); + RETURN_IF_ERROR(_convert_to_output_block(block)); + } // 4. Filter the output block finally. // NOTE that step 2/3 may be skipped, for Query. - RETURN_IF_ERROR(_filter_output_block(block)); - // record rows return (after filter) for _limit check - _num_rows_return += block->rows(); + { + SCOPED_TIMER(_parent->_filter_timer); + RETURN_IF_ERROR(_filter_output_block(block)); + } } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); } @@ -148,8 +156,15 @@ Status VScanner::close(RuntimeState* state) { if (_vconjunct_ctx) { _vconjunct_ctx->close(state); } + + COUNTER_UPDATE(_parent->_scanner_wait_worker_timer, _scanner_wait_worker_timer); _is_closed = true; return Status::OK(); } +void VScanner::_update_counters_before_close() { + if (!_state->enable_profile()) return; + COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index aff3c60393..55739b2fa5 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -46,6 +46,9 @@ protected: // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; + // Update the counters before closing this scanner + virtual void _update_counters_before_close(); + // Init the input block if _input_tuple_desc is set. // Otherwise, use output_block directly. void _init_input_block(Block* output_block); @@ -66,8 +69,12 @@ public: // Call start_wait_worker_timer() when submit the scanner to the thread pool. // And call update_wait_worker_timer() when it is actually being executed. - void start_wait_worker_timer() {} - int64_t update_wait_worker_timer() { return 0; } + void start_wait_worker_timer() { + _watch.reset(); + _watch.start(); + } + + void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); } RuntimeState* runtime_state() { return _state; } @@ -82,7 +89,10 @@ public: bool need_to_close() { return _need_to_close; } - void mark_to_need_to_close() { _need_to_close = true; } + void mark_to_need_to_close() { + _update_counters_before_close(); + _need_to_close = true; + } void set_status_on_failure(const Status& st) { _status = st; } @@ -132,9 +142,15 @@ protected: // and will be destroyed at the end. std::vector _stale_vexpr_ctxs; + // num of rows read from scanner int64_t _num_rows_read = 0; - int64_t _raw_rows_read = 0; - int64_t _num_rows_return = 0; + + // Set true after counter is updated finally + bool _has_updated_counter = false; + + // watch to count the time wait for scanner thread + MonotonicStopWatch _watch; + int64_t _scanner_wait_worker_timer = 0; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 881c53742d..0e2bc17697 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -1840,4 +1840,9 @@ Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector } return Status::OK(); } + +std::string VOlapScanNode::get_name() { + return fmt::format("VOlapScanNode({0})", _olap_scan_node.table_name); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index e83b1c0e48..7e50ac629d 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -62,6 +62,8 @@ public: std::vector>* sub_scan_range, RuntimeProfile* profile); + std::string get_name() override; + private: // In order to ensure the accuracy of the query result // only key column conjuncts will be remove as idle conjunct diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index d6a64c9d9b..199c8076d8 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -18,6 +18,7 @@ #include "vec/exprs/vexpr_context.h" #include "udf/udf_internal.h" +#include "util/stack_util.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized {