diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 19e6fe87c4..f46c463d29 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -142,12 +142,12 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { Status AnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0]; - _curr_tuple_pool.reset(new MemPool(mem_tracker())); - _prev_tuple_pool.reset(new MemPool(mem_tracker())); - _mem_pool.reset(new MemPool(mem_tracker())); + _curr_tuple_pool.reset(new MemPool(mem_tracker_held())); + _prev_tuple_pool.reset(new MemPool(mem_tracker_held())); + _mem_pool.reset(new MemPool(mem_tracker_held())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size()); @@ -186,7 +186,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { Status AnalyticEvalNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); //RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->open(state)); @@ -814,7 +814,7 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const { Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); //RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(state->check_query_state("Analytic eval, while get_next.")); diff --git a/be/src/exec/assert_num_rows_node.cpp b/be/src/exec/assert_num_rows_node.cpp index 6be5eca938..36deaab07e 100644 --- a/be/src/exec/assert_num_rows_node.cpp +++ b/be/src/exec/assert_num_rows_node.cpp @@ -50,7 +50,7 @@ Status AssertNumRowsNode::prepare(RuntimeState* state) { Status AssertNumRowsNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // ISSUE-3435 RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); @@ -58,7 +58,7 @@ Status AssertNumRowsNode::open(RuntimeState* state) { Status AssertNumRowsNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); output_batch->reset(); child(0)->get_next(state, output_batch, eos); _num_rows_returned += output_batch->num_rows(); diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 0d366ab1d4..2bae8e3668 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -52,9 +52,9 @@ BlockingJoinNode::~BlockingJoinNode() { Status BlockingJoinNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker_held())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _left_child_timer = ADD_TIMER(runtime_profile(), "LeftChildTime"); _build_row_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); @@ -93,14 +93,14 @@ Status BlockingJoinNode::close(RuntimeState* state) { void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); status->set_value(construct_build_side(state)); } Status BlockingJoinNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // RETURN_IF_ERROR(Expr::open(_conjuncts, state)); RETURN_IF_CANCELLED(state); diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index a281d29c6e..5448e059e1 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -63,7 +63,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status BrokerScanNode::prepare(RuntimeState* state) { VLOG_QUERY << "BrokerScanNode prepare"; RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _runtime_state = state; _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -88,7 +88,7 @@ Status BrokerScanNode::prepare(RuntimeState* state) { Status BrokerScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(start_scanners()); @@ -107,7 +107,7 @@ Status BrokerScanNode::start_scanners() { Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // check if CANCELLED. if (state->is_cancelled()) { std::unique_lock l(_batch_queue_lock); @@ -376,7 +376,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, void BrokerScanNode::scanner_worker(int start_idx, int length) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); // Clone expr context std::vector scanner_expr_ctxs; auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp index fe748904f0..b4e25f8b1d 100644 --- a/be/src/exec/cross_join_node.cpp +++ b/be/src/exec/cross_join_node.cpp @@ -33,7 +33,7 @@ CrossJoinNode::CrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Des Status CrossJoinNode::prepare(RuntimeState* state) { DCHECK(_join_op == TJoinOp::CROSS_JOIN); RETURN_IF_ERROR(BlockingJoinNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_batch_pool.reset(new ObjectPool()); return Status::OK(); } @@ -86,7 +86,7 @@ Status CrossJoinNode::get_next(RuntimeState* state, RowBatch* output_batch, bool // TOOD(zhaochun) // RETURN_IF_ERROR(state->check_query_state()); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit() || _eos) { *eos = true; diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 4750e54b36..ab4b63320d 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -122,7 +122,7 @@ Status CsvScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // add timer _split_check_timer = ADD_TIMER(_runtime_profile, "split check timer"); @@ -207,7 +207,7 @@ Status CsvScanNode::prepare(RuntimeState* state) { Status CsvScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); VLOG_CRITICAL << "CsvScanNode::Open"; if (nullptr == state) { @@ -238,7 +238,7 @@ Status CsvScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit()) { *eos = true; diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index cea7068631..73ee88b018 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -66,7 +66,7 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status EsHttpScanNode::prepare(RuntimeState* state) { VLOG_QUERY << "EsHttpScanNode prepare"; RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _scanner_profile.reset(new RuntimeProfile("EsHttpScanNode")); runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); @@ -122,7 +122,7 @@ Status EsHttpScanNode::build_conjuncts_list() { Status EsHttpScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); if (_properties.find(ESScanReader::KEY_QUERY_DSL) != _properties.end()) { @@ -200,7 +200,7 @@ Status EsHttpScanNode::collect_scanners_status() { Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (state->is_cancelled()) { std::unique_lock l(_batch_queue_lock); if (update_status(Status::Cancelled("Cancelled"))) { @@ -424,7 +424,7 @@ static std::string get_host_port(const std::vector& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise& p_status) { SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); // Clone expr context std::vector scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index f12f6ca18a..4a1adf2f43 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -40,7 +40,7 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); @@ -86,7 +86,7 @@ Status ExceptNode::open(RuntimeState* state) { Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); *eos = true; if (reached_limit()) { return Status::OK(); diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 410ed4ace1..f9b9caa6fb 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -60,7 +60,7 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime"); // TODO: figure out appropriate buffer size DCHECK_GT(_num_senders, 0); @@ -79,7 +79,7 @@ Status ExchangeNode::prepare(RuntimeState* state) { Status ExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_merging) { RETURN_IF_ERROR(_sort_exec_exprs.open(state)); TupleRowComparator less_than(_sort_exec_exprs, _is_asc_order, _nulls_first); @@ -132,7 +132,7 @@ Status ExchangeNode::fill_input_row_batch(RuntimeState* state) { Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit()) { _stream_recvr->transfer_all_resources(output_batch); diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index e4bf0b3b13..29462ac037 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -219,9 +219,18 @@ Status ExecNode::prepare(RuntimeState* state) { std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker = std::make_shared("ExecNode:" + _runtime_profile->name(), - _runtime_profile.get()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + _mem_tracker_held = + std::make_unique("ExecNode:" + _runtime_profile->name(), + _runtime_profile.get(), nullptr, "PeakMemoryUsage"); + // Only when the query profile is enabled, the node allocated memory will be track through the mem hook, + // otherwise _mem_tracker_growh is nullptr, and SCOPED_CONSUME_MEM_TRACKER will do nothing. + if (state->query_options().__isset.is_report_success && + state->query_options().is_report_success) { + _mem_tracker_growh = std::make_shared( + "ExecNode:MemoryOnlyTrackAlloc:" + _runtime_profile->name(), _runtime_profile.get(), + nullptr, "MemoryOnlyTrackAllocNoConsiderFree", true); + } + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, intermediate_row_desc())); @@ -243,7 +252,7 @@ Status ExecNode::prepare(RuntimeState* state) { } Status ExecNode::open(RuntimeState* state) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 1de8f921a8..7f5ba74656 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -194,8 +194,9 @@ public: RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); } RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; } - MemTracker* mem_tracker() const { return _mem_tracker.get(); } - std::shared_ptr mem_tracker_shared() const { return _mem_tracker; } + MemTracker* mem_tracker_held() const { return _mem_tracker_held.get(); } + MemTracker* mem_tracker_growh() const { return _mem_tracker_growh.get(); } + std::shared_ptr mem_tracker_growh_shared() const { return _mem_tracker_growh; } OpentelemetrySpan get_next_span() { return _get_next_span; } @@ -299,8 +300,11 @@ protected: std::unique_ptr _runtime_profile; - /// Account for peak memory used by this node - std::shared_ptr _mem_tracker; + // Record the memory size held by this node. + std::unique_ptr _mem_tracker_held; + // Record the memory size allocated by this node. + // Similar to tcmalloc heap profile growh, only track memory alloc, not track memory free. + std::shared_ptr _mem_tracker_growh; RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _rows_returned_rate; diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 471801abca..f55ae79be9 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -103,9 +103,9 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker_held())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); @@ -184,7 +184,7 @@ Status HashJoinNode::close(RuntimeState* state) { void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise* status) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); status->set_value(child(0)->open(state)); } @@ -230,7 +230,7 @@ Status HashJoinNode::open(RuntimeState* state) { } RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state)); RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state)); @@ -321,7 +321,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo // but if the expression calculation in this node needs to apply for additional memory, // it may cause the memory to exceed the limit. SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit()) { *eos = true; diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index d04ddc5061..154a639594 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -44,7 +44,7 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); @@ -85,7 +85,7 @@ Status IntersectNode::open(RuntimeState* state) { Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); *eos = true; if (reached_limit()) { return Status::OK(); diff --git a/be/src/exec/merge_node.cpp b/be/src/exec/merge_node.cpp index 07d3f21e80..f0f57950b7 100644 --- a/be/src/exec/merge_node.cpp +++ b/be/src/exec/merge_node.cpp @@ -63,7 +63,7 @@ Status MergeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status MergeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); @@ -93,7 +93,7 @@ Status MergeNode::prepare(RuntimeState* state) { Status MergeNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // Prepare const expr lists. for (int i = 0; i < _const_result_expr_ctx_lists.size(); ++i) { RETURN_IF_ERROR(Expr::open(_const_result_expr_ctx_lists[i], state)); @@ -110,7 +110,7 @@ Status MergeNode::open(RuntimeState* state) { Status MergeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // Create new tuple buffer for row_batch. int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size(); void* tuple_buffer = row_batch->tuple_data_pool()->allocate(tuple_buffer_size); diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index 5df00ee9ac..734ebeeba0 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -52,7 +52,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -102,7 +102,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) { Status MysqlScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); VLOG_CRITICAL << "MysqlScanNode::Open"; if (nullptr == state) { @@ -157,7 +157,7 @@ Status MysqlScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // create new tuple buffer for row_batch int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size(); diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp index 84194c7510..23b9ec51b1 100644 --- a/be/src/exec/odbc_scan_node.cpp +++ b/be/src/exec/odbc_scan_node.cpp @@ -54,7 +54,7 @@ Status OdbcScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -94,7 +94,7 @@ Status OdbcScanNode::prepare(RuntimeState* state) { Status OdbcScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); VLOG_CRITICAL << _scan_node_type << "::Open"; if (nullptr == state) { @@ -139,7 +139,7 @@ Status OdbcScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit()) { *eos = true; diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 5bf924d1c1..e1b62781c8 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -180,7 +180,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) { Status OlapScanNode::prepare(RuntimeState* state) { init_scan_profile(); RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // create scanner profile // create timer _tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT); @@ -232,7 +232,7 @@ Status OlapScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _resource_info = ResourceTls::get_resource_tls(); @@ -276,7 +276,7 @@ Status OlapScanNode::open(RuntimeState* state) { Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // check if Canceled. if (state->is_cancelled()) { std::unique_lock l(_row_batches_lock); @@ -1487,7 +1487,7 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { void OlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); Status status = Status::OK(); for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); @@ -1664,7 +1664,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } void OlapScanNode::scanner_thread(OlapScanner* scanner) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); Thread::set_self_name("olap_scanner"); if (UNLIKELY(_transfer_done)) { _scanner_done = true; diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 03cecb794a..ba51a1fb28 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -183,11 +183,11 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); state_ = state; - mem_pool_.reset(new MemPool(mem_tracker())); - agg_fn_pool_.reset(new MemPool(mem_tracker())); + mem_pool_.reset(new MemPool(mem_tracker_held())); + agg_fn_pool_.reset(new MemPool(mem_tracker_held())); ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime"); get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime"); @@ -230,7 +230,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(), &agg_fn_evals_, row_desc)); - expr_results_pool_.reset(new MemPool(mem_tracker())); + expr_results_pool_.reset(new MemPool(mem_tracker_held())); if (!grouping_exprs_.empty()) { RowDescriptor build_row_desc(intermediate_tuple_desc_, false); RETURN_IF_ERROR(PartitionedHashTableCtx::Create( @@ -248,7 +248,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { // Open the child before consuming resources in this node. RETURN_IF_ERROR(child(0)->open(state)); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // Claim reservation after the child has been opened to reduce the peak reservation // requirement. @@ -337,7 +337,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { } Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // 1. `!need_finalize` means this aggregation node not the level two aggregation node // 2. `grouping_exprs_.size() == 0 ` means is not group by // 3. `child(0)->rows_returned() == 0` mean not data from child @@ -717,7 +717,7 @@ PartitionedAggregationNode::Partition::~Partition() { } Status PartitionedAggregationNode::Partition::InitStreams() { - agg_fn_pool.reset(new MemPool(parent->mem_tracker())); + agg_fn_pool.reset(new MemPool(parent->mem_tracker_held())); DCHECK_EQ(agg_fn_evals.size(), 0); NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), parent->agg_fn_evals_, &agg_fn_evals); diff --git a/be/src/exec/repeat_node.cpp b/be/src/exec/repeat_node.cpp index 90ff8c9d43..6db760db7f 100644 --- a/be/src/exec/repeat_node.cpp +++ b/be/src/exec/repeat_node.cpp @@ -53,7 +53,7 @@ Status RepeatNode::init(const TPlanNode& tnode, RuntimeState* state) { Status RepeatNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _runtime_state = state; _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); if (_output_tuple_desc == nullptr) { @@ -72,7 +72,7 @@ Status RepeatNode::prepare(RuntimeState* state) { Status RepeatNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); for (int i = 0; i < _expr_evals.size(); i++) { RETURN_IF_ERROR(_expr_evals[i]->open(state)); @@ -149,7 +149,7 @@ Status RepeatNode::get_repeated_batch(RowBatch* child_row_batch, int repeat_id_i Status RepeatNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); DCHECK(_repeat_id_idx >= 0); for (const std::vector& v : _grouping_list) { diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index e52cddb98f..28682b7fd7 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -100,7 +100,7 @@ Status SchemaScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // new one mem pool _tuple_pool.reset(new (std::nothrow) MemPool()); @@ -200,7 +200,7 @@ Status SchemaScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_scanner_param.user) { TSetSessionParams param; @@ -244,7 +244,7 @@ Status SchemaScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit()) { *eos = true; diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp index 81574ac9cc..a343e86d38 100644 --- a/be/src/exec/select_node.cpp +++ b/be/src/exec/select_node.cpp @@ -35,14 +35,14 @@ SelectNode::SelectNode(ObjectPool* pool, const TPlanNode& tnode, const Descripto Status SelectNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _child_row_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size())); return Status::OK(); } Status SelectNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); } @@ -50,7 +50,7 @@ Status SelectNode::open(RuntimeState* state) { Status SelectNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_CANCELLED(state); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (reached_limit() || (_child_row_idx == _child_row_batch->num_rows() && _child_eos)) { // we're already done or we exhausted the last child batch and there won't be any diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 3458c57293..4a75ec234d 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -42,10 +42,10 @@ Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { Status SetOperationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); - _build_pool.reset(new MemPool(mem_tracker())); + _build_pool.reset(new MemPool(mem_tracker_held())); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); for (size_t i = 0; i < _child_expr_lists.size(); ++i) { @@ -130,7 +130,7 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) { Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); // open result expr lists. for (const std::vector& exprs : _child_expr_lists) { diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc index a40d889b0e..f85b6cf1a6 100644 --- a/be/src/exec/spill_sort_node.cc +++ b/be/src/exec/spill_sort_node.cc @@ -42,7 +42,7 @@ Status SpillSortNode::init(const TPlanNode& tnode, RuntimeState* state) { Status SpillSortNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); // AddExprCtxsToFree(_sort_exec_exprs); return Status::OK(); @@ -51,7 +51,7 @@ Status SpillSortNode::prepare(RuntimeState* state) { Status SpillSortNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Spill sort, while open.")); @@ -81,7 +81,7 @@ Status SpillSortNode::open(RuntimeState* state) { Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Spill sort, while getting next.")); diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp index 1eb94aac0e..930e484d2a 100644 --- a/be/src/exec/table_function_node.cpp +++ b/be/src/exec/table_function_node.cpp @@ -90,7 +90,7 @@ bool TableFunctionNode::_is_inner_and_empty() { Status TableFunctionNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT); @@ -106,7 +106,7 @@ Status TableFunctionNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(Expr::open(_fn_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_vfn_ctxs, state)); @@ -198,7 +198,7 @@ bool TableFunctionNode::_roll_table_functions(int last_eos_idx) { // And the inner loop is to expand the row by table functions, and output row by row. Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); const RowDescriptor& parent_rowdesc = row_batch->row_desc(); const RowDescriptor& child_rowdesc = _children[0]->row_desc(); diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index f5215b10d5..dda95e1c6c 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -60,8 +60,8 @@ Status TopNNode::init(const TPlanNode& tnode, RuntimeState* state) { Status TopNNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - _tuple_pool.reset(new MemPool(mem_tracker())); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + _tuple_pool.reset(new MemPool(mem_tracker_held())); RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); // AddExprCtxsToFree(_sort_exec_exprs); @@ -77,7 +77,7 @@ Status TopNNode::prepare(RuntimeState* state) { Status TopNNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Top n, before open.")); RETURN_IF_ERROR(_sort_exec_exprs.open(state)); @@ -129,7 +129,7 @@ Status TopNNode::open(RuntimeState* state) { Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch.")); diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp index de5e7d7aec..fdf0718750 100644 --- a/be/src/exec/union_node.cpp +++ b/be/src/exec/union_node.cpp @@ -69,7 +69,7 @@ Status UnionNode::init(const TPlanNode& tnode, RuntimeState* state) { Status UnionNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _materialize_exprs_evaluate_timer = @@ -96,7 +96,7 @@ Status UnionNode::prepare(RuntimeState* state) { Status UnionNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // open const expr lists. for (const std::vector& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::open(exprs, state)); @@ -234,7 +234,7 @@ Status UnionNode::get_next_const(RuntimeState* state, RowBatch* row_batch) { Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); if (_to_close_child_idx != -1) { diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 34baeb9ff3..7d1c3de7c1 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -28,8 +28,6 @@ namespace doris { -const std::string MemTracker::COUNTER_NAME = "PeakMemoryUsage"; - struct TrackerGroup { std::list trackers; std::mutex group_lock; @@ -41,8 +39,9 @@ struct TrackerGroup { // Multiple groups are used to reduce the impact of locks. static std::vector mem_tracker_pool(1000); -MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent) - : _label(label) { +MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTrackerLimiter* parent, + const std::string& profile_counter_name, bool only_track_alloc) + : _label(label), _only_track_alloc(only_track_alloc) { if (profile == nullptr) { _consumption = std::make_shared(TUnit::BYTES); } else { @@ -55,7 +54,7 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, MemTra // Other consumption metrics are used in trackers below the process level to account // for memory (such as free buffer pool buffers) that is not tracked by consume() and // release(). - _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); + _consumption = profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES); } if (parent) { diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index bf468043e1..46d1ec24d4 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -45,7 +45,9 @@ public: // Creates and adds the tracker to the mem_tracker_pool. MemTracker(const std::string& label, RuntimeProfile* profile = nullptr, - MemTrackerLimiter* parent = nullptr); + MemTrackerLimiter* parent = nullptr, + const std::string& profile_counter_name = "PeakMemoryUsage", + bool only_track_alloc = false); // For MemTrackerLimiter MemTracker() { _parent_group_num = -1; } @@ -65,6 +67,7 @@ public: void consume(int64_t bytes) { if (bytes == 0) return; + if (bytes < 0 && _only_track_alloc) return; _consumption->add(bytes); } void release(int64_t bytes) { consume(-bytes); } @@ -85,8 +88,6 @@ public: return msg.str(); } - static const std::string COUNTER_NAME; - protected: // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -97,6 +98,8 @@ protected: int64_t _parent_group_num = 0; std::string _parent_label = "-"; + bool _only_track_alloc = false; + // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. std::list::iterator _tracker_group_it; }; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index abb3d85d5c..6bac6a8203 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -44,12 +44,13 @@ std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; bool MemTrackerLimiter::_oom_avoidance {true}; MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit, - RuntimeProfile* profile) { + RuntimeProfile* profile, + const std::string& profile_counter_name) { DCHECK_GE(byte_limit, -1); if (profile == nullptr) { _consumption = std::make_shared(TUnit::BYTES); } else { - _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); + _consumption = profile->AddSharedHighWaterMarkCounter(profile_counter_name, TUnit::BYTES); } _type = type; _label = label; @@ -67,6 +68,7 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ } MemTrackerLimiter::~MemTrackerLimiter() { + consume(_untracked_mem); // mem hook record tracker cannot guarantee that the final consumption is 0, // nor can it guarantee that the memory alloc and free are recorded in a one-to-one correspondence. // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index dfba608144..5ebda8230b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -74,7 +74,8 @@ public: public: // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label = std::string(), int64_t byte_limit = -1, - RuntimeProfile* profile = nullptr); + RuntimeProfile* profile = nullptr, + const std::string& profile_counter_name = "PeakMemoryUsage"); ~MemTrackerLimiter(); diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 6caafdc5db..150f154c60 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -67,13 +67,16 @@ SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { - _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); + if (mem_tracker) + _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer( const std::shared_ptr& mem_tracker) : _mem_tracker(mem_tracker) { - _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); + if (_mem_tracker) + _need_pop = + thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get()); } AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index cdc8d483f2..fd16150f46 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -258,9 +258,10 @@ private: class AddThreadMemTrackerConsumer { public: // The owner and user of MemTracker are in the same thread, and the raw pointer is faster. + // If mem_tracker is nullptr, do nothing. explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker); - // The owner and user of MemTracker are in different threads. + // The owner and user of MemTracker are in different threads. If mem_tracker is nullptr, do nothing. explicit AddThreadMemTrackerConsumer(const std::shared_ptr& mem_tracker); ~AddThreadMemTrackerConsumer(); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 13337a2e83..ea7fb70d7a 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -348,7 +348,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // Build phase _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); runtime_profile()->add_child(_build_phase_profile, false, nullptr); @@ -635,7 +635,7 @@ Status HashJoinNode::open(RuntimeState* state) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); return Status::OK(); } diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index d5d5280f57..91aad01589 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -155,7 +155,7 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) { Status VJoinNodeBase::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open"); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); std::promise thread_status; @@ -188,7 +188,7 @@ void VJoinNodeBase::_reset_tuple_is_null_column() { void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, std::promise* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); status->set_value(child(0)->open(state)); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 2d3a954903..2a77a19e37 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -104,7 +104,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VNestedLoopJoinNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); @@ -206,7 +206,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_probe_timer); RETURN_IF_CANCELLED(state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_output_left_side_only) { RETURN_IF_ERROR(get_left_side(state, &_left_block)); @@ -652,7 +652,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state)); } RETURN_IF_ERROR(VJoinNodeBase::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); // We can close the right child to release its resources because its input has been // fully consumed. diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 6a57e917b4..cf492cc29d 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -76,7 +76,7 @@ Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewEsScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index e86060ad41..bce00af077 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -36,7 +36,7 @@ std::string NewJdbcScanNode::get_name() { Status NewJdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << "VNewJdbcScanNode::Prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 571566b88a..96dda34953 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -38,7 +38,7 @@ std::string NewOdbcScanNode::get_name() { Status NewOdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index ad978de41f..996d2b2ac4 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -37,7 +37,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index cda9cb6197..a7654e5df6 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -71,7 +71,7 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_init_profile()); @@ -89,7 +89,7 @@ Status VScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); @@ -109,7 +109,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next"); SCOPED_TIMER(_get_next_timer); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (state->is_cancelled()) { _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); return _scanner_ctx->status(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index e003adfdea..0d86bfc546 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -293,7 +293,7 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) Status AggregationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime"); _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime"); @@ -311,7 +311,6 @@ Status AggregationNode::prepare(RuntimeState* state) { _hash_table_input_counter = ADD_COUNTER(runtime_profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(runtime_profile(), "MaxRowSizeInBytes", TUnit::UNIT); COUNTER_SET(_max_row_size_counter, (int64_t)0); - _data_mem_tracker = std::make_unique("AggregationNode:Data"); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); @@ -450,7 +449,7 @@ Status AggregationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); @@ -494,7 +493,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_streaming_preagg) { bool child_eos = false; @@ -716,7 +715,7 @@ Status AggregationNode::_merge_without_key(Block* block) { } void AggregationNode::_update_memusage_without_key() { - _data_mem_tracker->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena); + mem_tracker_held()->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena); _mem_usage_record.used_in_arena = _agg_arena_pool->size(); } @@ -1321,10 +1320,10 @@ void AggregationNode::_update_memusage_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { auto& data = agg_method.data; - _data_mem_tracker->consume(_agg_arena_pool->size() - - _mem_usage_record.used_in_arena); - _data_mem_tracker->consume(data.get_buffer_size_in_bytes() - - _mem_usage_record.used_in_state); + mem_tracker_held()->consume(_agg_arena_pool->size() - + _mem_usage_record.used_in_arena); + mem_tracker_held()->consume(data.get_buffer_size_in_bytes() - + _mem_usage_record.used_in_state); _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); _mem_usage_record.used_in_arena = _agg_arena_pool->size(); }, @@ -1347,7 +1346,7 @@ void AggregationNode::_close_with_serialized_key() { } void AggregationNode::release_tracker() { - _data_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); + mem_tracker_held()->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); } void AggregationNode::_release_mem() { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 3612df001f..ccb28902d9 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -790,8 +790,6 @@ private: bool _use_fixed_length_serialization_opt; std::unique_ptr _mem_pool; - std::unique_ptr _data_mem_tracker; - size_t _align_aggregate_states = 1; /// The offset to the n-th aggregate function in a row of aggregate functions. Sizes _offsets_of_aggregate_states; diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 8aa1f8708a..3d3ea7313b 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -148,9 +148,9 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VAnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); - _mem_pool.reset(new MemPool(mem_tracker())); + _mem_pool.reset(new MemPool(mem_tracker_held())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); SCOPED_TIMER(_evaluation_timer); @@ -216,7 +216,7 @@ Status VAnalyticEvalNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(0)->open(state)); RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state)); @@ -254,7 +254,7 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VAnalyticEvalNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); if (_input_eos && _output_block_index == _input_blocks.size()) { diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index f164b5a93d..68bdca68d0 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -59,7 +59,7 @@ Status VBrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VBrokerScanNode::prepare(RuntimeState* state) { VLOG_QUERY << "VBrokerScanNode prepare"; RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _runtime_state = state; _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -85,7 +85,7 @@ Status VBrokerScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(start_scanners()); @@ -109,7 +109,7 @@ Status VBrokerScanNode::start_scanners() { Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // check if CANCELLED. if (state->is_cancelled()) { std::unique_lock l(_batch_queue_lock); @@ -273,7 +273,7 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner void VBrokerScanNode::scanner_worker(int start_idx, int length) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh_shared()); Thread::set_self_name("vbroker_scanner"); Status status = Status::OK(); ScannerCounter counter; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index c9cc87d6bd..59073c879a 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -49,7 +49,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); DCHECK_GT(_num_senders, 0); _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( @@ -66,7 +66,7 @@ Status VExchangeNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); @@ -84,7 +84,7 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (_num_rows_returned + block->rows() < _limit) { diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index c1c206a371..3a343147a3 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -50,7 +50,7 @@ Status VMysqlScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -101,7 +101,7 @@ Status VMysqlScanNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); VLOG_CRITICAL << "MysqlScanNode::Open"; if (nullptr == state) { diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index a50e91b510..8ae6482fd1 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -115,7 +115,7 @@ Status VSchemaScanNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_scanner_param.user) { TSetSessionParams param; @@ -138,7 +138,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { } RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // new one mem pool _tuple_pool.reset(new (std::nothrow) MemPool()); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 769e125524..32a9b43dcb 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -123,7 +123,7 @@ Status VSetOperationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); // open result expr lists. for (const std::vector& exprs : _child_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); @@ -135,7 +135,7 @@ Status VSetOperationNode::open(RuntimeState* state) { Status VSetOperationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 37e120b015..ed37e5cc24 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -68,7 +68,7 @@ Status VSortNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); return Status::OK(); } @@ -77,7 +77,7 @@ Status VSortNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("vsort, while open.")); @@ -113,7 +113,7 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSortNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_sorter->get_next(state, block, eos)); reached_limit(block, eos);