[enhancement](profile) add blocks produced profile to track if output block is very small (#18217)
This commit is contained in:
@ -205,6 +205,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
||||
// set up profile counters
|
||||
profile()->add_child(_plan->runtime_profile(), true, nullptr);
|
||||
_rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
|
||||
_blocks_produced_counter = ADD_COUNTER(profile(), "BlocksProduced", TUnit::UNIT);
|
||||
_fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
|
||||
|
||||
VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
|
||||
@ -274,35 +275,23 @@ Status PlanFragmentExecutor::open() {
|
||||
}
|
||||
|
||||
Status PlanFragmentExecutor::open_vectorized_internal() {
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
{
|
||||
SCOPED_CPU_TIMER(_fragment_cpu_timer);
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
|
||||
RETURN_IF_CANCELLED(_runtime_state);
|
||||
}
|
||||
if (_sink == nullptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
{
|
||||
SCOPED_CPU_TIMER(_fragment_cpu_timer);
|
||||
if (_sink == nullptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(_sink->open(runtime_state()));
|
||||
}
|
||||
|
||||
{
|
||||
auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
|
||||
doris::vectorized::Block block;
|
||||
bool eos = false;
|
||||
|
||||
while (!eos) {
|
||||
RETURN_IF_CANCELLED(_runtime_state);
|
||||
RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
|
||||
|
||||
{
|
||||
SCOPED_CPU_TIMER(_fragment_cpu_timer);
|
||||
RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
|
||||
}
|
||||
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
SCOPED_CPU_TIMER(_fragment_cpu_timer);
|
||||
// Collect this plan and sub plan statistics, and send to parent plan.
|
||||
if (_collect_query_statistics_with_every_batch) {
|
||||
_collect_query_statistics();
|
||||
@ -319,7 +308,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
_collect_query_statistics();
|
||||
Status status;
|
||||
{
|
||||
@ -339,7 +327,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
|
||||
Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* block, bool* eos) {
|
||||
while (!_done) {
|
||||
block->clear_column_data(_plan->row_desc().num_materialized_slots());
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(
|
||||
_plan->get_next_after_projects(
|
||||
_runtime_state.get(), block, &_done,
|
||||
@ -351,6 +338,8 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
|
||||
|
||||
if (block->rows() > 0) {
|
||||
COUNTER_UPDATE(_rows_produced_counter, block->rows());
|
||||
// Not very sure, if should contain empty block
|
||||
COUNTER_UPDATE(_blocks_produced_counter, block->rows());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,6 +185,9 @@ private:
|
||||
// Number of rows returned by this fragment
|
||||
RuntimeProfile::Counter* _rows_produced_counter;
|
||||
|
||||
// Number of blocks returned by this fragment
|
||||
RuntimeProfile::Counter* _blocks_produced_counter;
|
||||
|
||||
RuntimeProfile::Counter* _fragment_cpu_timer;
|
||||
|
||||
// It is shared with BufferControlBlock and will be called in two different
|
||||
|
||||
Reference in New Issue
Block a user