From 10be583e52249b3ff94bf9a4120e858516ee5ec9 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Fri, 30 Dec 2022 09:56:33 +0800 Subject: [PATCH] [chore](pipeline) optimize profile information (#15433) --- .../pipeline/exec/aggregation_source_operator.h | 2 +- be/src/pipeline/exec/assert_num_rows_operator.h | 2 +- be/src/pipeline/exec/const_value_operator.h | 2 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 12 +++--------- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.cpp | 2 +- be/src/pipeline/exec/union_source_operator.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++++------- be/src/pipeline/pipeline_task.cpp | 17 ++++++++++------- be/src/pipeline/pipeline_task.h | 2 ++ be/src/vec/exec/join/vhash_join_node.cpp | 5 +++-- be/src/vec/exec/vexchange_node.cpp | 1 + be/src/vec/exec/vrepeat_node.cpp | 2 -- be/src/vec/exec/vselect_node.cpp | 1 - be/src/vec/runtime/vdata_stream_recvr.h | 1 + 18 files changed, 35 insertions(+), 37 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 04c28abd40..f1c93071da 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -38,7 +38,7 @@ class AggSourceOperator final : public SourceOperator public: AggSourceOperator(OperatorBuilderBase*, ExecNode*); // if exec node split to: sink, source operator. the source operator - // should skip `alloc_resoucre()` function call, only sink operator + // should skip `alloc_resource()` function call, only sink operator // call the function Status open(RuntimeState*) override { return Status::OK(); } }; diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 40acfd05f8..a3f9702dd6 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -27,7 +27,7 @@ namespace pipeline { class AssertNumRowsOperatorBuilder final : public OperatorBuilder { public: AssertNumRowsOperatorBuilder(int32_t id, ExecNode* node) - : OperatorBuilder(id, "AssertNumRowsOperatorBuilder", node) {}; + : OperatorBuilder(id, "AssertNumRowsOperator", node) {}; OperatorPtr build_operator() override; }; diff --git a/be/src/pipeline/exec/const_value_operator.h b/be/src/pipeline/exec/const_value_operator.h index 467f5ded05..567501ccf5 100644 --- a/be/src/pipeline/exec/const_value_operator.h +++ b/be/src/pipeline/exec/const_value_operator.h @@ -29,7 +29,7 @@ namespace pipeline { class ConstValueOperatorBuilder final : public OperatorBuilder { public: ConstValueOperatorBuilder(int32_t id, ExecNode* node) - : OperatorBuilder(id, "ConstValueOperatorBuilder", node) {}; + : OperatorBuilder(id, "ConstValueOperator", node) {}; OperatorPtr build_operator() override; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 7ea3e47546..54105e7f8b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -36,7 +36,7 @@ class HashJoinProbeOperator final : public StatefulOperatorget_name(); }; bool is_sink() const; @@ -257,8 +257,6 @@ public: ~DataSinkOperator() override = default; - std::string get_name() const override { return "DataSinkOperator"; } - Status prepare(RuntimeState* state) override { RETURN_IF_ERROR(_sink->prepare(state)); _runtime_profile.reset(new RuntimeProfile( @@ -317,8 +315,6 @@ public: ~StreamingOperator() override = default; - std::string get_name() const override { return "StreamingOperator"; } - Status prepare(RuntimeState* state) override { _runtime_profile.reset(new RuntimeProfile( fmt::format("{} (id={})", _operator_builder->get_name(), _operator_builder->id()))); @@ -387,10 +383,9 @@ public: ~SourceOperator() override = default; - std::string get_name() const override { return "SourceOperator"; } - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { + SCOPED_TIMER(this->_runtime_profile->total_time_counter()); auto& node = StreamingOperator::_node; bool eos = false; RETURN_IF_ERROR(node->get_next_after_projects( @@ -423,10 +418,9 @@ public: virtual ~StatefulOperator() = default; - std::string get_name() const override { return "DataStateOperator"; } - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { + SCOPED_TIMER(this->_runtime_profile->total_time_counter()); auto& node = StreamingOperator::_node; auto& child = StreamingOperator::_child; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 0687d0aa64..899bee2324 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -33,7 +33,7 @@ class SetProbeSinkOperatorBuilder final : public OperatorBuilder> { private: constexpr static auto builder_name = - is_intersect ? "IntersectProbeSinkOperatorBuilder" : "ExceptProbeSinkOperatorBuilder"; + is_intersect ? "IntersectProbeSinkOperator" : "ExceptProbeSinkOperator"; public: SetProbeSinkOperatorBuilder(int32_t id, int child_id, ExecNode* set_node); diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 2b8d17a137..bd93592769 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -33,7 +33,7 @@ class SetSinkOperatorBuilder final : public OperatorBuilder> { private: constexpr static auto builder_name = - is_intersect ? "IntersectSinkOperatorBuilder" : "ExceptSinkOperatorBuilder"; + is_intersect ? "IntersectSinkOperator" : "ExceptSinkOperator"; public: SetSinkOperatorBuilder(int32_t id, ExecNode* set_node); diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index 4a90162759..99ea8e5242 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -33,7 +33,7 @@ class SetSourceOperatorBuilder : public OperatorBuilder> { private: constexpr static auto builder_name = - is_intersect ? "IntersectSourceOperatorBuilder" : "ExceptSourceOperatorBuilder"; + is_intersect ? "IntersectSourceOperator" : "ExceptSourceOperator"; public: SetSourceOperatorBuilder(int32_t id, ExecNode* set_node); diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 8e02877cdd..488d2dc58a 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -23,7 +23,7 @@ namespace doris::pipeline { UnionSinkOperatorBuilder::UnionSinkOperatorBuilder(int32_t id, int child_id, ExecNode* node, std::shared_ptr queue) - : OperatorBuilder(id, "UnionSinkOperatorBuilder", node), + : OperatorBuilder(id, "UnionSinkOperator", node), _cur_child_id(child_id), _data_queue(queue) {}; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index c4f3250175..d6f0fcf399 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -31,7 +31,7 @@ namespace pipeline { UnionSourceOperatorBuilder::UnionSourceOperatorBuilder(int32_t id, ExecNode* node, std::shared_ptr queue) - : OperatorBuilder(id, "UnionSourceOperatorBuilder", node), _data_queue(queue) {}; + : OperatorBuilder(id, "UnionSourceOperator", node), _data_queue(queue) {}; OperatorPtr UnionSourceOperatorBuilder::build_operator() { return std::make_shared(this, _node, _data_queue); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4cfeb036ae..1430c8d6f6 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -305,13 +305,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks( // TODO: use virtual function to do abstruct Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) { - auto* fragment_context = this; auto node_type = node->type(); switch (node_type) { // for source case TPlanNodeType::BROKER_SCAN_NODE: { - OperatorBuilderPtr operator_t = std::make_shared( - fragment_context->next_operator_builder_id(), node); + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } @@ -320,8 +319,8 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur case TPlanNodeType::ODBC_SCAN_NODE: case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::ES_SCAN_NODE: { - OperatorBuilderPtr operator_t = std::make_shared( - fragment_context->next_operator_builder_id(), node); + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } @@ -332,8 +331,8 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { - OperatorBuilderPtr operator_t = std::make_shared( - fragment_context->next_operator_builder_id(), node); + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index b63576b947..3cd5f7f1e3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -35,6 +35,8 @@ void PipelineTask::_init_profile() { _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); + _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT); + _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); } @@ -48,13 +50,15 @@ Status PipelineTask::prepare(RuntimeState* state) { RETURN_IF_ERROR(o->prepare(state)); } - _task_profile->add_info_string("SinkId", fmt::format("{}", _sink->id())); + _task_profile->add_info_string("Sink", fmt::format("{}({})", _sink->get_name(), _sink->id())); fmt::memory_buffer operator_ids_str; for (size_t i = 0; i < _operators.size(); i++) { if (i == 0) { - fmt::format_to(operator_ids_str, fmt::format("[{}", _operators[i]->id())); + fmt::format_to(operator_ids_str, + fmt::format("[{}({})", _operators[i]->get_name(), _operators[i]->id())); } else { - fmt::format_to(operator_ids_str, fmt::format(", {}", _operators[i]->id())); + fmt::format_to(operator_ids_str, + fmt::format(", {}({})", _operators[i]->get_name(), _operators[i]->id())); } } fmt::format_to(operator_ids_str, "]"); @@ -205,14 +209,13 @@ void PipelineTask::set_state(PipelineTaskState state) { _wait_sink_watcher.stop(); } } else if (_cur_state == RUNNABLE) { + COUNTER_UPDATE(_block_counts, 1); if (state == BLOCKED_FOR_SOURCE) { _wait_source_watcher.start(); - COUNTER_UPDATE(_block_counts, 1); + COUNTER_UPDATE(_block_by_source_counts, 1); } else if (state == BLOCKED_FOR_SINK) { _wait_sink_watcher.start(); - COUNTER_UPDATE(_block_counts, 1); - } else if (state == BLOCKED_FOR_DEPENDENCY) { - COUNTER_UPDATE(_block_counts, 1); + COUNTER_UPDATE(_block_by_sink_counts, 1); } } _cur_state = state; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index f14669f41a..be267b8efa 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -193,6 +193,8 @@ private: RuntimeProfile::Counter* _sink_timer; RuntimeProfile::Counter* _get_block_timer; RuntimeProfile::Counter* _block_counts; + RuntimeProfile::Counter* _block_by_source_counts; + RuntimeProfile::Counter* _block_by_sink_counts; RuntimeProfile::Counter* _schedule_counts; MonotonicStopWatch _wait_source_watcher; RuntimeProfile::Counter* _wait_source_timer; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 70bf40cce0..a71a6c931c 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -460,6 +460,7 @@ void HashJoinNode::prepare_for_next() { Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block, bool* eos) { + SCOPED_TIMER(_probe_timer); if (_short_circuit_for_null_in_probe_side) { // If we use a short-circuit strategy for null value in build side (e.g. if join operator is // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly. @@ -550,7 +551,7 @@ Status HashJoinNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* out Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_block, bool eos) { _probe_eos = eos; if (input_block->rows() > 0) { - COUNTER_UPDATE(_probe_rows_counter, _probe_block.rows()); + COUNTER_UPDATE(_probe_rows_counter, input_block->rows()); int probe_expr_ctxs_sz = _probe_expr_ctxs.size(); _probe_columns.resize(probe_expr_ctxs_sz); @@ -585,7 +586,6 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "HashJoinNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_TIMER(_probe_timer); if (_short_circuit_for_null_in_probe_side) { // If we use a short-circuit strategy for null value in build side (e.g. if join operator is @@ -659,6 +659,7 @@ void HashJoinNode::_prepare_probe_block() { Status HashJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::open(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 9a30ee743e..4e3b596988 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -119,6 +119,7 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { *eos = true; auto limit = _limit - _num_rows_returned; block->set_num_rows(limit); + _num_rows_returned = _limit; } COUNTER_SET(_rows_returned_counter, _num_rows_returned); } diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 882619930e..aa32990598 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -171,8 +171,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl } Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_CANCELLED(state); DCHECK(_repeat_id_idx >= 0); for (const std::vector& v : _grouping_list) { diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index b8f10d57be..cd8924abb4 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -63,7 +63,6 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSelectNode::pull"); - SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index cddd21e6b0..087ffe1987 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -261,6 +261,7 @@ public: _data_arrival_cv.notify_one(); _recvr->_num_buffered_bytes += block_size; + COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size); } }; } // namespace vectorized