[enhancement](profilev2) add some fields for profile v2 (#25611)
Add 3 counters for ExecNode: ExecTime - Total execution time(excluding the execution time of children). OutputBytes - The total number of bytes output to parent. BlockCount - The total count of blocks output to parent.
This commit is contained in:
@ -109,7 +109,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
_hash_table_input_counter(nullptr),
|
||||
_build_timer(nullptr),
|
||||
_expr_timer(nullptr),
|
||||
_exec_timer(nullptr),
|
||||
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
|
||||
_intermediate_tuple_desc(nullptr),
|
||||
_output_tuple_id(tnode.agg_node.output_tuple_id),
|
||||
@ -257,7 +256,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
_build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1);
|
||||
_build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime");
|
||||
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
|
||||
_exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
|
||||
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
|
||||
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
|
||||
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
|
||||
@ -417,11 +415,13 @@ Status AggregationNode::prepare(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
RETURN_IF_ERROR(prepare_profile(state));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
|
||||
|
||||
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
|
||||
@ -472,6 +472,7 @@ Status AggregationNode::open(RuntimeState* state) {
|
||||
|
||||
Status AggregationNode::do_pre_agg(vectorized::Block* input_block,
|
||||
vectorized::Block* output_block) {
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
RETURN_IF_ERROR(_executor.pre_agg(input_block, output_block));
|
||||
|
||||
// pre stream agg need use _num_row_return to decide whether to do pre stream agg
|
||||
@ -510,6 +511,7 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
}
|
||||
|
||||
Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
|
||||
_make_nullable_output_key(block);
|
||||
// dispose the having clause, should not be execute in prestreaming agg
|
||||
@ -520,6 +522,7 @@ Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* bloc
|
||||
}
|
||||
|
||||
Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) {
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
if (in_block->rows() > 0) {
|
||||
RETURN_IF_ERROR(_executor.execute(in_block));
|
||||
RETURN_IF_ERROR(_try_spill_disk());
|
||||
|
||||
Reference in New Issue
Block a user