From 3094815f8ff410023d16fcfe2e3fd7bbc1f7da1d Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Thu, 30 Mar 2023 09:51:03 +0800 Subject: [PATCH] [enhancement](profile) add blocks produced profile to track if output block is very small (#18217) --- be/src/runtime/plan_fragment_executor.cpp | 27 +++++++---------------- be/src/runtime/plan_fragment_executor.h | 3 +++ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 94c200a5f2..76b67ab099 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -205,6 +205,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // set up profile counters profile()->add_child(_plan->runtime_profile(), true, nullptr); _rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT); + _blocks_produced_counter = ADD_COUNTER(profile(), "BlocksProduced", TUnit::UNIT); _fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime"); VLOG_NOTICE << "plan_root=\n" << _plan->debug_string(); @@ -274,35 +275,23 @@ Status PlanFragmentExecutor::open() { } Status PlanFragmentExecutor::open_vectorized_internal() { + SCOPED_TIMER(profile()->total_time_counter()); { SCOPED_CPU_TIMER(_fragment_cpu_timer); - SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR(_plan->open(_runtime_state.get())); RETURN_IF_CANCELLED(_runtime_state); - } - if (_sink == nullptr) { - return Status::OK(); - } - { - SCOPED_CPU_TIMER(_fragment_cpu_timer); + if (_sink == nullptr) { + return Status::OK(); + } RETURN_IF_ERROR(_sink->open(runtime_state())); - } - - { auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }}; doris::vectorized::Block block; bool eos = false; while (!eos) { RETURN_IF_CANCELLED(_runtime_state); + RETURN_IF_ERROR(get_vectorized_internal(&block, &eos)); - { - SCOPED_CPU_TIMER(_fragment_cpu_timer); - RETURN_IF_ERROR(get_vectorized_internal(&block, &eos)); - } - - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_CPU_TIMER(_fragment_cpu_timer); // Collect this plan and sub plan statistics, and send to parent plan. if (_collect_query_statistics_with_every_batch) { _collect_query_statistics(); @@ -319,7 +308,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { } { - SCOPED_TIMER(profile()->total_time_counter()); _collect_query_statistics(); Status status; { @@ -339,7 +327,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* block, bool* eos) { while (!_done) { block->clear_column_data(_plan->row_desc().num_materialized_slots()); - SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR_AND_CHECK_SPAN( _plan->get_next_after_projects( _runtime_state.get(), block, &_done, @@ -351,6 +338,8 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* if (block->rows() > 0) { COUNTER_UPDATE(_rows_produced_counter, block->rows()); + // Not very sure, if should contain empty block + COUNTER_UPDATE(_blocks_produced_counter, block->rows()); break; } } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 352edfccbe..46521004fb 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -185,6 +185,9 @@ private: // Number of rows returned by this fragment RuntimeProfile::Counter* _rows_produced_counter; + // Number of blocks returned by this fragment + RuntimeProfile::Counter* _blocks_produced_counter; + RuntimeProfile::Counter* _fragment_cpu_timer; // It is shared with BufferControlBlock and will be called in two different