[enhancement](memtracker) Optimize exec node memory tracking (#14711)

This commit is contained in:
Xinyi Zou
2022-12-01 14:52:21 +08:00
committed by GitHub
parent b4d32a0c44
commit 176f519fa1
49 changed files with 170 additions and 151 deletions

View File

@ -293,7 +293,7 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
Status AggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
@ -311,7 +311,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
_hash_table_input_counter = ADD_COUNTER(runtime_profile(), "HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(runtime_profile(), "MaxRowSizeInBytes", TUnit::UNIT);
COUNTER_SET(_max_row_size_counter, (int64_t)0);
_data_mem_tracker = std::make_unique<MemTracker>("AggregationNode:Data");
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
@ -450,7 +449,7 @@ Status AggregationNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@ -494,7 +493,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_is_streaming_preagg) {
bool child_eos = false;
@ -716,7 +715,7 @@ Status AggregationNode::_merge_without_key(Block* block) {
}
void AggregationNode::_update_memusage_without_key() {
_data_mem_tracker->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena);
mem_tracker_held()->consume(_agg_arena_pool->size() - _mem_usage_record.used_in_arena);
_mem_usage_record.used_in_arena = _agg_arena_pool->size();
}
@ -1321,10 +1320,10 @@ void AggregationNode::_update_memusage_with_serialized_key() {
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
_data_mem_tracker->consume(_agg_arena_pool->size() -
_mem_usage_record.used_in_arena);
_data_mem_tracker->consume(data.get_buffer_size_in_bytes() -
_mem_usage_record.used_in_state);
mem_tracker_held()->consume(_agg_arena_pool->size() -
_mem_usage_record.used_in_arena);
mem_tracker_held()->consume(data.get_buffer_size_in_bytes() -
_mem_usage_record.used_in_state);
_mem_usage_record.used_in_state = data.get_buffer_size_in_bytes();
_mem_usage_record.used_in_arena = _agg_arena_pool->size();
},
@ -1347,7 +1346,7 @@ void AggregationNode::_close_with_serialized_key() {
}
void AggregationNode::release_tracker() {
_data_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena);
mem_tracker_held()->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena);
}
void AggregationNode::_release_mem() {