From 8c0e13ab511e6f3c98eb2d9637ffeb3e60e06c6f Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Mon, 5 Dec 2022 11:51:52 +0800 Subject: [PATCH] [improvement](profile) add detail memory counter for exec nodes (#14806) * [improvement](profile) improve accuraccy of memory usage and add detail memory counter * fix --- be/src/vec/common/sort/heap_sorter.cpp | 8 +++ be/src/vec/common/sort/heap_sorter.h | 3 + be/src/vec/common/sort/sorter.cpp | 7 ++ be/src/vec/common/sort/sorter.h | 4 ++ be/src/vec/common/sort/topn_sorter.cpp | 8 +++ be/src/vec/common/sort/topn_sorter.h | 2 + be/src/vec/core/sort_cursor.h | 2 + .../exec/join/process_hash_table_probe_impl.h | 20 ++++-- be/src/vec/exec/join/vhash_join_node.cpp | 36 +++++++--- be/src/vec/exec/join/vhash_join_node.h | 21 +++--- be/src/vec/exec/scan/new_olap_scan_node.cpp | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 11 ++- be/src/vec/exec/scan/vscan_node.cpp | 8 ++- be/src/vec/exec/scan/vscan_node.h | 3 + be/src/vec/exec/vaggregation_node.cpp | 43 +++++++---- be/src/vec/exec/vaggregation_node.h | 11 +++ be/src/vec/exec/vanalytic_eval_node.cpp | 42 ++++++++--- be/src/vec/exec/vanalytic_eval_node.h | 2 +- be/src/vec/exec/vset_operation_node.cpp | 2 +- be/src/vec/exec/vset_operation_node.h | 4 +- be/src/vec/exec/vsort_node.cpp | 16 ++++- be/src/vec/exec/vsort_node.h | 2 + be/src/vec/exprs/vectorized_agg_fn.cpp | 3 +- be/src/vec/exprs/vectorized_agg_fn.h | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 72 +++++++++++-------- be/src/vec/runtime/vdata_stream_recvr.h | 2 + be/src/vec/sink/vdata_stream_sender.cpp | 33 +++++++-- 27 files changed, 275 insertions(+), 94 deletions(-) diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 6ca7fae7b2..70a06ea7d1 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -24,6 +24,7 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc) : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), + _data_size(0), _heap_size(limit + offset), _heap(std::make_unique()), _topn_filter_rows(0), @@ -93,6 +94,9 @@ Status HeapSorter::append_block(Block* block) { _heap->replace_top_if_less(std::move(cursor)); } } + if (block_view->ref_count() > 1) { + _data_size += block_view->value().block.allocated_bytes(); + } return Status::OK(); } @@ -171,4 +175,8 @@ Status HeapSorter::_prepare_sort_descs(Block* block) { return Status::OK(); } +size_t HeapSorter::data_size() const { + return _data_size; +} + } // namespace doris::vectorized diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index 6f644a9d92..f37053c49e 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -69,6 +69,8 @@ public: Status get_next(RuntimeState* state, Block* block, bool* eos) override; + size_t data_size() const override; + static constexpr size_t HEAP_SORT_THRESHOLD = 1024; private: @@ -76,6 +78,7 @@ private: Status _prepare_sort_descs(Block* block); + size_t _data_size; size_t _heap_size; std::unique_ptr _heap; Block _return_block; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index f41ceeb5b7..0f1f4c451e 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -202,5 +202,12 @@ Status FullSorter::_do_sort() { } return Status::OK(); } +size_t FullSorter::data_size() const { + size_t size = _state->unsorted_block->allocated_bytes(); + for (const auto& block : _state->sorted_blocks) { + size += block.allocated_bytes(); + } + return size; +} } // namespace doris::vectorized diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 856d740dbe..69d50e3d93 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -75,6 +75,8 @@ public: virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0; + virtual size_t data_size() const = 0; + protected: Status partial_sort(Block& src_block, Block& dest_block); @@ -107,6 +109,8 @@ public: Status get_next(RuntimeState* state, Block* block, bool* eos) override; + size_t data_size() const override; + private: bool _reach_limit() { return _state->unsorted_block->rows() > BUFFERED_BLOCK_SIZE || diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 411a41efd5..2db7addce8 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -81,4 +81,12 @@ Status TopNSorter::_do_sort(Block* block) { return Status::OK(); } +size_t TopNSorter::data_size() const { + size_t size = _state->unsorted_block->allocated_bytes(); + for (const auto& block : _state->sorted_blocks) { + size += block.allocated_bytes(); + } + return size; +} + } // namespace doris::vectorized diff --git a/be/src/vec/common/sort/topn_sorter.h b/be/src/vec/common/sort/topn_sorter.h index 51df1bea5b..afbba86625 100644 --- a/be/src/vec/common/sort/topn_sorter.h +++ b/be/src/vec/common/sort/topn_sorter.h @@ -36,6 +36,8 @@ public: Status get_next(RuntimeState* state, Block* block, bool* eos) override; + size_t data_size() const override; + static constexpr size_t TOPN_SORT_THRESHOLD = 256; private: diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index b13316fe40..bef88d90c8 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -75,6 +75,8 @@ public: HeapSortCursorBlockView& value() { return _reference; } + int ref_count() const { return _ref_count; } + private: ~SharedHeapSortCursorBlockView() noexcept = default; int _ref_count; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 05499842cb..5f22e71992 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -173,8 +173,12 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); if (probe_index == 0) { + size_t old_probe_keys_memory_usage = 0; + if (_arena) { + old_probe_keys_memory_usage = _arena->size(); + } _arena.reset(new Arena()); - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { if (_probe_keys.size() < probe_rows) { _probe_keys.resize(probe_rows); } @@ -183,10 +187,12 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c _probe_keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, probe_raw_ptrs, *_arena); } + _join_node->_probe_arena_memory_usage->add(_arena->size() - + old_probe_keys_memory_usage); } } - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { key_getter.set_serialized_keys(_probe_keys.data()); } @@ -348,8 +354,12 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); if (probe_index == 0) { + size_t old_probe_keys_memory_usage = 0; + if (_arena) { + old_probe_keys_memory_usage = _arena->size(); + } _arena.reset(new Arena()); - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { if (_probe_keys.size() < probe_rows) { _probe_keys.resize(probe_rows); } @@ -359,9 +369,11 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( *_arena); } } + _join_node->_probe_arena_memory_usage->add(_arena->size() - + old_probe_keys_memory_usage); } - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { key_getter.set_serialized_keys(_probe_keys.data()); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index ea7fb70d7a..4cef9b9bd3 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -69,13 +69,12 @@ struct ProcessHashTableBuild { Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); Defer defer {[&]() { int64_t bucket_size = hash_table_ctx.hash_table.get_buffer_size_in_cells(); int64_t filled_bucket_size = hash_table_ctx.hash_table.size(); int64_t bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); - _join_node->_mem_used += bucket_bytes - old_bucket_bytes; + COUNTER_SET(_join_node->_hash_table_memory_usage, bucket_bytes); COUNTER_SET(_join_node->_build_buckets_counter, bucket_size); COUNTER_SET(_join_node->_build_buckets_fill_counter, filled_bucket_size); @@ -113,11 +112,15 @@ struct ProcessHashTableBuild { _build_side_hash_values.resize(_rows); auto& arena = *(_join_node->_arena); + auto old_build_arena_memory = arena.size(); { SCOPED_TIMER(_build_side_compute_hash_timer); - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { + auto old_keys_memory = hash_table_ctx.keys_memory_usage; hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); key_getter.set_serialized_keys(hash_table_ctx.keys.data()); + _join_node->_build_arena_memory_usage->add(hash_table_ctx.keys_memory_usage - + old_keys_memory); } for (size_t k = 0; k < _rows; ++k) { @@ -135,7 +138,8 @@ struct ProcessHashTableBuild { return Status::OK(); } } - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + KeyGetter>::value) { _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key); } else { @@ -192,6 +196,8 @@ struct ProcessHashTableBuild { } #undef EMPLACE_IMPL + _join_node->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); + COUNTER_UPDATE(_join_node->_build_table_expanse_timer, hash_table_ctx.hash_table.get_resize_timer_value()); COUNTER_UPDATE(_join_node->_build_table_convert_timer, @@ -248,7 +254,6 @@ private: HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VJoinNodeBase(pool, tnode, descs), - _mem_used(0), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids @@ -349,6 +354,16 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + + auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + runtime_profile()->add_child(memory_usage, false, nullptr); + _build_blocks_memory_usage = ADD_COUNTER(memory_usage, "BuildBlocks", TUnit::BYTES); + _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); + _build_arena_memory_usage = + memory_usage->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES); + _probe_arena_memory_usage = + memory_usage->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES); + // Build phase _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); runtime_profile()->add_child(_build_phase_profile, false, nullptr); @@ -647,6 +662,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); uint8_t index = 0; + int64_t mem_used = 0; int64_t last_mem_used = 0; bool eos = false; @@ -664,27 +680,30 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), child(1)->get_next_span(), eos); - _mem_used += block.allocated_bytes(); + mem_used += block.allocated_bytes(); if (block.rows() != 0) { SCOPED_TIMER(_build_side_merge_block_timer); RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); } - if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { + if (UNLIKELY(mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } _build_blocks->emplace_back(mutable_block.to_block()); + + COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[index].bytes()); + // TODO:: Rethink may we should do the process after we receive all build blocks ? // which is better. RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); mutable_block = MutableBlock(); ++index; - last_mem_used = _mem_used; + last_mem_used = mem_used; } } @@ -695,6 +714,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } _build_blocks->emplace_back(mutable_block.to_block()); + COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[index].bytes()); RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); } } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index a41d3c2f7b..4e58d2b7e9 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -50,6 +50,7 @@ struct SerializedHashTableContext { Iter iter; bool inited = false; std::vector keys; + size_t keys_memory_usage = 0; void serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) { if (keys.size() < num_rows) { @@ -57,10 +58,12 @@ struct SerializedHashTableContext { } _arena.reset(new Arena()); + keys_memory_usage = 0; size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); } + keys_memory_usage = _arena->size(); } void init_once() { @@ -74,17 +77,6 @@ private: std::unique_ptr _arena; }; -template -struct IsSerializedHashTableContextTraits { - constexpr static bool value = false; -}; - -template -struct IsSerializedHashTableContextTraits< - ColumnsHashing::HashMethodSerialized> { - constexpr static bool value = true; -}; - // T should be UInt32 UInt64 UInt128 template struct PrimaryTypeHashTableContext { @@ -251,9 +243,12 @@ private: RuntimeProfile::Counter* _build_side_compute_hash_timer; RuntimeProfile::Counter* _build_side_merge_block_timer; - RuntimeProfile* _build_phase_profile; + RuntimeProfile::Counter* _build_blocks_memory_usage; + RuntimeProfile::Counter* _hash_table_memory_usage; + RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage; + RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage; - int64_t _mem_used; + RuntimeProfile* _build_phase_profile; std::shared_ptr _arena; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 007cd3c65a..b70cb5b88f 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -46,8 +46,8 @@ Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { } Status NewOlapScanNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(VScanNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + RETURN_IF_ERROR(VScanNode::prepare(state)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 203ef8ed68..15b8131448 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -53,10 +53,13 @@ Status ScannerContext::init() { // The free blocks is used for final output block of scanners. // So use _output_tuple_desc; + int64_t free_blocks_memory_usage = 0; for (int i = 0; i < pre_alloc_block_count; ++i) { auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size); + free_blocks_memory_usage += block->allocated_bytes(); _free_blocks.emplace_back(block); } + _parent->_free_blocks_memory_usage->add(free_blocks_memory_usage); #ifndef BE_TEST // 3. get thread token @@ -81,6 +84,7 @@ vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { if (!_free_blocks.empty()) { auto block = _free_blocks.back(); _free_blocks.pop_back(); + _parent->_free_blocks_memory_usage->add(-block->allocated_bytes()); return block; } } @@ -92,18 +96,21 @@ vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { void ScannerContext::return_free_block(vectorized::Block* block) { block->clear_column_data(); + _parent->_free_blocks_memory_usage->add(block->allocated_bytes()); std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(block); } void ScannerContext::append_blocks_to_queue(const std::vector& blocks) { std::lock_guard l(_transfer_lock); + auto old_bytes_in_queue = _cur_bytes_in_queue; _blocks_queue.insert(_blocks_queue.end(), blocks.begin(), blocks.end()); _update_block_queue_empty(); for (auto b : blocks) { _cur_bytes_in_queue += b->allocated_bytes(); } _blocks_queue_added_cv.notify_one(); + _parent->_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); } bool ScannerContext::empty_in_queue() { @@ -144,7 +151,9 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos *block = _blocks_queue.front(); _blocks_queue.pop_front(); _update_block_queue_empty(); - _cur_bytes_in_queue -= (*block)->allocated_bytes(); + auto block_bytes = (*block)->allocated_bytes(); + _cur_bytes_in_queue -= block_bytes; + _parent->_queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { *eos = _is_finished; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c83669532c..5fed3257e3 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -83,13 +83,13 @@ Status VScanNode::prepare(RuntimeState* state) { } Status VScanNode::open(RuntimeState* state) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); @@ -142,6 +142,12 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* Status VScanNode::_init_profile() { // 1. counters for scan node + auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, true); + _runtime_profile->add_child(memory_usage, false, nullptr); + _queued_blocks_memory_usage = + memory_usage->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES); + _free_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES); + _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); _total_throughput_counter = runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 7e99cf3cdc..c641d45d01 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -258,6 +258,9 @@ protected: // Max num of scanner thread RuntimeProfile::Counter* _max_scanner_thread_num = nullptr; + RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage; + RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage; + private: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 92ed1f2336..c690bc4521 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -291,6 +291,12 @@ void AggregationNode::_init_hash_method(std::vector& probe_exprs) } Status AggregationNode::prepare_profile(RuntimeState* state) { + auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + runtime_profile()->add_child(memory_usage, false, nullptr); + _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); + _serialize_key_arena_memory_usage = + memory_usage->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES); + _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime"); _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime"); @@ -327,9 +333,8 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; - RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(), - _mem_pool.get(), intermediate_slot_desc, - output_slot_desc)); + RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( + state, child(0)->row_desc(), intermediate_slot_desc, output_slot_desc)); } // set profile timer to evaluators @@ -443,6 +448,9 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { } Status AggregationNode::prepare(RuntimeState* state) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::prepare(state)); RETURN_IF_ERROR(prepare_profile(state)); return Status::OK(); @@ -510,7 +518,6 @@ Status AggregationNode::do_pre_agg(vectorized::Block* input_block, Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); if (_is_streaming_preagg) { bool child_eos = false; @@ -522,12 +529,16 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { _children[0]->get_next_after_projects(state, &_preagg_block, &child_eos), _children[0]->get_next_span(), child_eos); } while (_preagg_block.rows() == 0 && !child_eos); - if (_preagg_block.rows() != 0) { - RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block)); - } else { - RETURN_IF_ERROR(pull(state, block, eos)); + { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + if (_preagg_block.rows() != 0) { + RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block)); + } else { + RETURN_IF_ERROR(pull(state, block, eos)); + } } } else { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(pull(state, block, eos)); } return Status::OK(); @@ -742,7 +753,9 @@ Status AggregationNode::_merge_without_key(Block* block) { } void AggregationNode::_update_memusage_without_key() { - mem_tracker_held()->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena); + auto arena_memory_usage = _agg_arena_pool->size() - _mem_usage_record.used_in_arena; + mem_tracker_held()->consume(arena_memory_usage); + _serialize_key_arena_memory_usage->add(arena_memory_usage); _mem_usage_record.used_in_arena = _agg_arena_pool->size(); } @@ -1347,12 +1360,18 @@ void AggregationNode::_update_memusage_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { auto& data = agg_method.data; - mem_tracker_held()->consume(_agg_arena_pool->size() - - _mem_usage_record.used_in_arena); + auto arena_memory_usage = _agg_arena_pool->size() + + _aggregate_data_container->memory_usage() - + _mem_usage_record.used_in_arena; + mem_tracker_held()->consume(arena_memory_usage); mem_tracker_held()->consume(data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); + _serialize_key_arena_memory_usage->add(arena_memory_usage); + COUNTER_UPDATE(_hash_table_memory_usage, + data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); - _mem_usage_record.used_in_arena = _agg_arena_pool->size(); + _mem_usage_record.used_in_arena = + _agg_arena_pool->size() + _aggregate_data_container->memory_usage(); }, _agg_data->_aggregated_method_variant); } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 1ef8518fee..b6b2660a00 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -60,6 +60,7 @@ struct AggregationMethodSerialized { Iterator iterator; bool inited = false; std::vector keys; + size_t keys_memory_usage = 0; AggregationMethodSerialized() : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr), @@ -89,6 +90,7 @@ struct AggregationMethodSerialized { for (size_t i = 0; i < num_rows; ++i) { keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); } + keys_memory_usage = _arena->size(); } else { _arena.reset(); if (total_bytes > _serialized_key_buffer_size) { @@ -106,6 +108,7 @@ struct AggregationMethodSerialized { for (const auto& column : key_columns) { column->serialize_vec(keys, num_rows, max_one_row_byte_size); } + keys_memory_usage = _serialized_key_buffer_size; } return max_one_row_byte_size; } @@ -648,6 +651,8 @@ public: _expand(); } + int64_t memory_usage() const { return _arena_pool.size(); } + template AggregateDataPtr append_data(const KeyType& key) { assert(sizeof(KeyType) == _size_of_key); @@ -835,6 +840,9 @@ private: RuntimeProfile::Counter* _hash_table_input_counter; RuntimeProfile::Counter* _max_row_size_counter; + RuntimeProfile::Counter* _hash_table_memory_usage; + RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage; + bool _is_streaming_preagg; Block _preagg_block = Block(); bool _should_expand_hash_table = true; @@ -882,10 +890,13 @@ private: void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, const ColumnRawPtrs& key_columns, const size_t num_rows) { if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { + auto old_keys_memory = agg_method.keys_memory_usage; SCOPED_TIMER(_serialize_key_timer); int64_t row_size = (int64_t)(agg_method.serialize_keys(key_columns, num_rows)); COUNTER_SET(_max_row_size_counter, std::max(_max_row_size_counter->value(), row_size)); state.set_serialized_keys(agg_method.keys.data()); + + _serialize_key_arena_memory_usage->add(agg_method.keys_memory_usage - old_keys_memory); } } diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 3d3ea7313b..ed1ac17767 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -146,11 +146,14 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status VAnalyticEvalNode::prepare(RuntimeState* state) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); - _mem_pool.reset(new MemPool(mem_tracker_held())); + + auto* memory_usage = runtime_profile()->create_child("MemoryUsage", true, true); + runtime_profile()->add_child(memory_usage, false, nullptr); + _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); SCOPED_TIMER(_evaluation_timer); @@ -159,7 +162,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { for (size_t i = 0; i < _agg_functions_size; ++i) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; - RETURN_IF_ERROR(_agg_functions[i]->prepare(state, child(0)->row_desc(), _mem_pool.get(), + RETURN_IF_ERROR(_agg_functions[i]->prepare(state, child(0)->row_desc(), intermediate_slot_desc, output_slot_desc)); _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() && !_agg_functions[i]->data_type()->is_nullable()); @@ -213,12 +216,16 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { } Status VAnalyticEvalNode::open(RuntimeState* state) { - START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); - RETURN_IF_CANCELLED(state); + { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_CANCELLED(state); + } RETURN_IF_ERROR(child(0)->open(state)); + + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state)); for (size_t i = 0; i < _agg_functions_size; ++i) { @@ -254,15 +261,16 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VAnalyticEvalNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); if (_input_eos && _output_block_index == _input_blocks.size()) { *eos = true; return Status::OK(); } + RETURN_IF_ERROR(_executor.get_next(state, block, eos)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); reached_limit(block, eos); return Status::OK(); @@ -276,6 +284,7 @@ Status VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* bl break; } + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); size_t current_block_rows = _input_blocks[_output_block_index].rows(); if (next_partition) { _executor.execute(_partition_by_start, _partition_by_end, _partition_by_start, @@ -297,6 +306,7 @@ Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* state, Block* block, break; } + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); size_t current_block_rows = _input_blocks[_output_block_index].rows(); while (_current_row_position < _partition_by_end.pos && _window_end_position < current_block_rows) { @@ -321,6 +331,7 @@ Status VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block, break; } + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); size_t current_block_rows = _input_blocks[_output_block_index].rows(); while (_current_row_position < _partition_by_end.pos && _window_end_position < current_block_rows) { @@ -360,6 +371,9 @@ Status VAnalyticEvalNode::_consumed_block_and_init_partition(RuntimeState* state RETURN_IF_ERROR(_fetch_next_block_data(state)); //return true, fetch next block found_partition_end = _get_partition_by_end(); //claculate new partition end } + + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + if (_input_eos && _input_total_rows == 0) { *eos = true; return Status::OK(); @@ -480,6 +494,8 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { return Status::OK(); } + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + input_block_first_row_positions.emplace_back(_input_total_rows); size_t block_rows = block.rows(); _input_total_rows += block_rows; @@ -515,6 +531,10 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { DCHECK_GE(result_col_id, 0); _ordey_by_column_idxs[i] = result_col_id; } + + mem_tracker_held()->consume(block.allocated_bytes()); + _blocks_memory_usage->add(block.allocated_bytes()); + //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; _input_blocks.emplace_back(std::move(block)); @@ -571,7 +591,10 @@ void VAnalyticEvalNode::_insert_result_info(int64_t current_block_rows) { } Status VAnalyticEvalNode::_output_current_block(Block* block) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); + block->swap(std::move(_input_blocks[_output_block_index])); + _blocks_memory_usage->add(-block->allocated_bytes()); if (_origin_cols.size() < block->columns()) { block->erase_not_in(_origin_cols); } @@ -705,7 +728,6 @@ std::string VAnalyticEvalNode::debug_window_bound_string(TAnalyticWindowBoundary void VAnalyticEvalNode::_release_mem() { _agg_arena_pool = nullptr; - _mem_pool = nullptr; std::vector tmp_input_blocks; _input_blocks.swap(tmp_input_blocks); diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index 9597606f6c..c39340e94d 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -125,7 +125,6 @@ private: int64_t _rows_start_offset = 0; int64_t _rows_end_offset = 0; size_t _agg_functions_size = 0; - std::unique_ptr _mem_pool; /// The offset of the n-th functions. std::vector _offsets_of_aggregate_states; @@ -146,6 +145,7 @@ private: std::vector _origin_cols; RuntimeProfile::Counter* _evaluation_timer; + RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage; std::vector _change_to_nullable_flags; }; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 32a9b43dcb..d3e9c436d6 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -45,7 +45,7 @@ struct HashTableBuild { KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz, nullptr); - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); key_getter.set_serialized_keys(hash_table_ctx.keys.data()); } diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 473042940a..b93464ba87 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -184,7 +184,7 @@ struct HashTableProbe { if (_probe_index == 0) { _arena.reset(new Arena()); - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { if (_probe_keys.size() < _probe_rows) { _probe_keys.resize(_probe_rows); } @@ -196,7 +196,7 @@ struct HashTableProbe { } } - if constexpr (IsSerializedHashTableContextTraits::value) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { key_getter.set_serialized_keys(_probe_keys.data()); } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index c633a08113..9977f4815e 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -49,7 +49,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, _nulls_first, row_desc)); _reuse_mem = false; - } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit > 0 && + } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) { _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset, _pool, _is_asc_order, _nulls_first, row_desc)); @@ -64,10 +64,15 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status VSortNode::prepare(RuntimeState* state) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true"); + + auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, true); + _runtime_profile->add_child(memory_usage, false, nullptr); + _sort_blocks_memory_usage = ADD_COUNTER(memory_usage, "SortBlocks", TUnit::BYTES); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); return Status::OK(); } @@ -110,6 +115,7 @@ Status VSortNode::open(RuntimeState* state) { RETURN_IF_ERROR_AND_CHECK_SPAN( child(0)->get_next_after_projects(state, upstream_block.get(), &eos), child(0)->get_next_span(), eos); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_ERROR(sink(state, upstream_block.get(), eos)); if (!_reuse_mem) { upstream_block.reset(new Block()); @@ -117,6 +123,10 @@ Status VSortNode::open(RuntimeState* state) { } while (!eos); child(0)->close(state); + + mem_tracker_held()->consume(_sorter->data_size()); + COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size()); + return Status::OK(); } @@ -132,9 +142,9 @@ Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_blo } Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) { + SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSortNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); return pull(state, block, eos); } diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index 6da8e52e77..0a29d08c54 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -71,6 +71,8 @@ private: std::vector _is_asc_order; std::vector _nulls_first; + RuntimeProfile::Counter* _sort_blocks_memory_usage; + bool _reuse_mem; std::unique_ptr _sorter; diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 8f7527a15e..5e12f394eb 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -86,10 +86,9 @@ Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, const TSortIn return Status::OK(); } -Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, MemPool* pool, +Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, const SlotDescriptor* intermediate_slot_desc, const SlotDescriptor* output_slot_desc) { - DCHECK(pool != nullptr); DCHECK(intermediate_slot_desc != nullptr); DCHECK(_intermediate_slot_desc == nullptr); _output_slot_desc = output_slot_desc; diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index fa025edec1..bcc0b7e1ef 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -33,7 +33,7 @@ public: static Status create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, AggFnEvaluator** result); - Status prepare(RuntimeState* state, const RowDescriptor& desc, MemPool* pool, + Status prepare(RuntimeState* state, const RowDescriptor& desc, const SlotDescriptor* intermediate_slot_desc, const SlotDescriptor* output_slot_desc); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index ec9814a735..49cdea5988 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -78,6 +78,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block** next_block) { DCHECK(!_block_queue.empty()); Block* result = _block_queue.front().second; _recvr->_num_buffered_bytes -= _block_queue.front().first; + _recvr->_blocks_memory_usage->add(-_block_queue.front().first); VLOG_ROW << "fetched #rows=" << result->rows(); _block_queue.pop_front(); _update_block_queue_empty(); @@ -103,32 +104,34 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe // Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook, // limit memory via DataStreamRecvr::exceeds_limit. STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - std::lock_guard l(_lock); - if (_is_cancelled) { - return; - } - auto iter = _packet_seq_map.find(be_number); - if (iter != _packet_seq_map.end()) { - if (iter->second >= packet_seq) { - LOG(WARNING) << fmt::format( - "packet already exist [cur_packet_id= {} receive_packet_id={}]", iter->second, - packet_seq); + { + std::lock_guard l(_lock); + if (_is_cancelled) { return; } - iter->second = packet_seq; - } else { - _packet_seq_map.emplace(be_number, packet_seq); - } - auto block_byte_size = pblock.ByteSizeLong(); - COUNTER_UPDATE(_recvr->_bytes_received_counter, block_byte_size); + auto iter = _packet_seq_map.find(be_number); + if (iter != _packet_seq_map.end()) { + if (iter->second >= packet_seq) { + LOG(WARNING) << fmt::format( + "packet already exist [cur_packet_id= {} receive_packet_id={}]", + iter->second, packet_seq); + return; + } + iter->second = packet_seq; + } else { + _packet_seq_map.emplace(be_number, packet_seq); + } + auto pblock_byte_size = pblock.ByteSizeLong(); + COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size); - if (_num_remaining_senders <= 0) { - DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); - return; - } + if (_num_remaining_senders <= 0) { + DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); + return; + } - if (_is_cancelled) { - return; + if (_is_cancelled) { + return; + } } Block* block = nullptr; @@ -139,7 +142,12 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); } + auto block_byte_size = block->allocated_bytes(); VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n"; + + _recvr->_blocks_memory_usage->add(block_byte_size); + + std::lock_guard l(_lock); _block_queue.emplace_back(block_byte_size, block); _update_block_queue_empty(); // if done is nullptr, this function can't delay this response @@ -158,11 +166,14 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { // Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook, // limit memory via DataStreamRecvr::exceeds_limit. STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - std::unique_lock l(_lock); - if (_is_cancelled || !block->rows()) { - return; + { + std::unique_lock l(_lock); + if (_is_cancelled || !block->rows()) { + return; + } } Block* nblock = new Block(block->get_columns_with_type_and_name()); + COUNTER_UPDATE(_recvr->_local_bytes_received_counter, nblock->bytes()); // local exchange should copy the block contented if use move == false if (use_move) { @@ -177,6 +188,10 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { materialize_block_inplace(*nblock); size_t block_size = nblock->bytes(); + + _recvr->_blocks_memory_usage->add(nblock->allocated_bytes()); + + std::unique_lock l(_lock); _block_queue.emplace_back(block_size, nblock); _update_block_queue_empty(); _data_arrival_cv.notify_one(); @@ -301,7 +316,11 @@ VDataStreamRecvr::VDataStreamRecvr( } // Initialize the counters + auto* memory_usage = _profile->create_child("MemoryUsage", true, true); + _profile->add_child(memory_usage, false, nullptr); + _blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES); _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES); + _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer"); _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime"); @@ -338,13 +357,11 @@ void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_num int64_t packet_seq, ::google::protobuf::Closure** done) { SCOPED_ATTACH_TASK(_state->query_mem_tracker(), print_id(_state->query_id()), _fragment_instance_id); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); } void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(block, use_move); } @@ -359,7 +376,6 @@ bool VDataStreamRecvr::ready_to_read() { } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (!_is_merging) { Block* res = nullptr; RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res)); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 45af88cd50..386b8ef41f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -132,12 +132,14 @@ private: RuntimeProfile* _profile; RuntimeProfile::Counter* _bytes_received_counter; + RuntimeProfile::Counter* _local_bytes_received_counter; RuntimeProfile::Counter* _deserialize_row_batch_timer; RuntimeProfile::Counter* _first_batch_wait_total_timer; RuntimeProfile::Counter* _buffer_full_total_timer; RuntimeProfile::Counter* _data_arrival_timer; RuntimeProfile::Counter* _decompress_timer; RuntimeProfile::Counter* _decompress_bytes; + RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage; std::shared_ptr _sub_plan_query_statistics_recvr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index db1d8c82d2..794b1b0323 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,6 +95,7 @@ Status Channel::send_current_block(bool eos) { if (_enable_local_exchange && is_local()) { return send_local_block(eos); } + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); auto block = _mutable_block->to_block(); RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block)); block.clear_column_data(); @@ -181,6 +182,7 @@ Status Channel::add_rows(Block* block, const std::vector& rows) { } if (_mutable_block == nullptr) { + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); _mutable_block.reset(new MutableBlock(block->clone_empty())); } @@ -197,7 +199,10 @@ Status Channel::add_rows(Block* block, const std::vector& rows) { row_add = row_wait_add; } - _mutable_block->add_rows(block, begin, begin + row_add); + { + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); + _mutable_block->add_rows(block, begin, begin + row_add); + } row_wait_add -= row_add; begin += row_add; @@ -233,6 +238,7 @@ Status Channel::close_internal() { if (_mutable_block != nullptr && _mutable_block->rows() > 0) { RETURN_IF_ERROR(send_current_block(true)); } else { + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); RETURN_IF_ERROR(send_block(nullptr, true)); } // Don't wait for the last packet to finish, left it to close_wait. @@ -458,7 +464,6 @@ Status VDataStreamSender::send(RuntimeState* state, RowBatch* batch) { Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VDataStreamSender::send") SCOPED_TIMER(_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { // 1. serialize depends on it is not local exchange // 2. send block @@ -474,11 +479,16 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { RETURN_IF_ERROR(channel->send_local_block(block)); } } else { - RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); + } + for (auto channel : _channels) { if (channel->is_local()) { RETURN_IF_ERROR(channel->send_local_block(block)); } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(channel->send_block(_cur_pb_block, eos)); } } @@ -492,6 +502,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { if (current_channel->is_local()) { RETURN_IF_ERROR(current_channel->send_local_block(block)); } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block(), eos)); current_channel->ch_roll_pb_block(); @@ -505,7 +516,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { int result_size = _partition_expr_ctxs.size(); int result[result_size]; - RETURN_IF_ERROR(get_partition_column_result(block, result)); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(get_partition_column_result(block, result)); + } // vectorized calculate hash int rows = block->rows(); @@ -536,7 +550,11 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } } - Block::erase_useless_column(block, column_to_keep); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Block::erase_useless_column(block, column_to_keep); + } + RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, rows, block)); } else { for (int j = 0; j < result_size; ++j) { @@ -548,7 +566,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { hashes[i] = hashes[i] % element_size; } - Block::erase_useless_column(block, column_to_keep); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Block::erase_useless_column(block, column_to_keep); + } RETURN_IF_ERROR( channel_add_rows(_channel_shared_ptrs, element_size, hashes, rows, block)); }