From b137f03921a96d10b3ed98b571ce35aa18452f99 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Tue, 31 Oct 2023 17:53:52 +0800 Subject: [PATCH] [feature](profile) add MaxActiveTime and add exectime in pipelineX (#26148) --- .../pipeline/exec/aggregation_sink_operator.cpp | 8 ++++---- .../exec/aggregation_source_operator.cpp | 6 +++--- be/src/pipeline/exec/analytic_sink_operator.cpp | 4 ++-- .../pipeline/exec/analytic_source_operator.cpp | 6 +++--- .../pipeline/exec/assert_num_rows_operator.cpp | 2 +- ...inct_streaming_aggregation_sink_operator.cpp | 4 ++-- ...ct_streaming_aggregation_source_operator.cpp | 2 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 6 +++--- .../pipeline/exec/exchange_source_operator.cpp | 8 ++++---- be/src/pipeline/exec/hashjoin_build_sink.cpp | 6 +++--- .../pipeline/exec/hashjoin_probe_operator.cpp | 6 +++--- .../pipeline/exec/jdbc_table_sink_operator.cpp | 2 +- .../pipeline/exec/multi_cast_data_stream_sink.h | 2 +- .../exec/multi_cast_data_stream_source.cpp | 4 ++-- .../exec/nested_loop_join_build_operator.cpp | 4 ++-- .../exec/nested_loop_join_probe_operator.cpp | 4 ++-- .../pipeline/exec/olap_table_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/olap_table_sink_operator.h | 4 ++-- .../exec/partition_sort_sink_operator.cpp | 4 ++-- .../exec/partition_sort_source_operator.cpp | 4 ++-- be/src/pipeline/exec/repeat_operator.cpp | 2 +- .../pipeline/exec/result_file_sink_operator.cpp | 8 ++++---- be/src/pipeline/exec/result_sink_operator.cpp | 16 +++++++--------- be/src/pipeline/exec/scan_operator.cpp | 17 +++++++---------- be/src/pipeline/exec/schema_scan_operator.cpp | 4 ++-- be/src/pipeline/exec/select_operator.h | 2 +- .../pipeline/exec/set_probe_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/set_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/set_source_operator.cpp | 4 ++-- be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++--- be/src/pipeline/exec/sort_source_operator.cpp | 2 +- .../streaming_aggregation_sink_operator.cpp | 2 +- .../streaming_aggregation_source_operator.cpp | 2 +- be/src/pipeline/exec/union_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/union_source_operator.cpp | 4 ++-- .../local_exchange_sink_operator.cpp | 4 ++-- .../local_exchange_source_operator.cpp | 4 ++-- be/src/pipeline/pipeline_x/operator.cpp | 8 +++++--- be/src/pipeline/pipeline_x/operator.h | 7 ++++--- .../doris/common/util/RuntimeProfile.java | 6 +++++- 40 files changed, 100 insertions(+), 99 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 93b2e2d968..9eb49d234f 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -67,7 +67,7 @@ template Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(Base::profile()->total_time_counter()); + SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_open_timer); _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); @@ -160,7 +160,7 @@ Status AggSinkLocalState::init(RuntimeState* state, template Status AggSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(Base::profile()->total_time_counter()); + SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_open_timer); RETURN_IF_ERROR(Base::open(state)); _agg_data = Base::_shared_state->agg_data.get(); @@ -784,7 +784,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); local_state._shared_state->input_num_rows += in_block->rows(); if (in_block->rows() > 0) { @@ -804,7 +804,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, template Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { - SCOPED_TIMER(Base::profile()->total_time_counter()); + SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_close_timer); if (Base::_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 482a263887..bb290cfbd2 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -40,7 +40,7 @@ AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _agg_data = _shared_state->agg_data.get(); _get_results_timer = ADD_TIMER(profile(), "GetResultsTime"); @@ -509,7 +509,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._executor.get_result(state, block, source_state)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg @@ -528,7 +528,7 @@ void AggLocalState::make_nullable_output_key(vectorized::Block* block) { } Status AggLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 7109fa9dea..23a4838dff 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -28,7 +28,7 @@ OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator) Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); @@ -134,7 +134,7 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) { Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* input_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); local_state._shared_state->input_eos = source_state == SourceState::FINISHED; if (local_state._shared_state->input_eos && input_block->rows() == 0) { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 7c310db123..c705d849a3 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -38,7 +38,7 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* paren Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _agg_arena_pool = std::make_unique(); @@ -379,7 +379,7 @@ Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._shared_state->input_eos && (local_state._output_block_index == local_state._shared_state->input_blocks.size() || local_state._shared_state->input_total_rows == 0)) { @@ -415,7 +415,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block } Status AnalyticLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index f58f2568cf..baddce5a94 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -38,7 +38,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); local_state.add_num_rows_returned(block->rows()); int64_t num_rows_returned = local_state.num_rows_returned(); bool assert_res = false; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index ef2551124a..8a240d6336 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -194,7 +194,7 @@ Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeSt Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); local_state._shared_state->input_num_rows += in_block->rows(); Status ret = Status::OK(); @@ -235,7 +235,7 @@ Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state, Status exe if (_closed) { return Status::OK(); } - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) { // finish should be set, if not set here means error. diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp index 43456f10e3..ffdbab0127 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -107,7 +107,7 @@ DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectP Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); std::unique_ptr agg_block; RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block)); if (agg_block != nullptr) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 0ca38d81c1..e8d52b8eaa 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -98,7 +98,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _sender_id = info.sender_id; @@ -300,7 +300,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SourceState source_state) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto channel : local_state.channels) { @@ -502,7 +502,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->write_watcher_elapse_time()); if (_broadcast_dependency) { diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 86d7c3728d..a37272fd74 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -45,7 +45,7 @@ ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* paren Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( @@ -72,7 +72,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { } Status ExchangeLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); return Status::OK(); @@ -125,7 +125,7 @@ Status ExchangeSourceOperatorX::open(RuntimeState* state) { Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (_is_merging && !local_state.is_ready) { RETURN_IF_ERROR(local_state.stream_recvr->create_merger( local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, @@ -168,7 +168,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block } Status ExchangeLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 1af98cd77a..cb63f64ab4 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -46,7 +46,7 @@ HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _shared_hash_table_dependency = SharedHashTableDependency::create_shared(_parent->operator_id()); @@ -132,7 +132,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo } Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); auto& p = _parent->cast(); @@ -402,7 +402,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); // make one block for each 4 gigabytes diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 61f244c8de..67b36efcbf 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -32,7 +32,7 @@ HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, OperatorXB Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _shared_state->probe_ignore_null = p._probe_ignore_null; @@ -60,7 +60,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) } Status HashJoinProbeLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(JoinProbeLocalState::open(state)); @@ -122,7 +122,7 @@ vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() { } Status HashJoinProbeLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index ae8fcba3d1..fc90af3591 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -58,7 +58,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) { Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state.sink(state, block, source_state)); return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index 83c5390a2f..008238577f 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -77,7 +77,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c8036f79ba..61038baa32 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -135,7 +135,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); static_cast(_dependency)->set_consumer_id(p._consumer_id); @@ -155,7 +155,7 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, SourceState& source_state) { //auto& local_state = get_local_state(state); auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); bool eos = false; vectorized::Block tmp_block; vectorized::Block* output_block = block; diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 152bec5d7d..f525a1abf1 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -32,7 +32,7 @@ NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOpe Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; @@ -93,7 +93,7 @@ Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); auto rows = block->rows(); auto mem_usage = block->allocated_bytes(); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index ca51d0f271..f73b84c538 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -52,7 +52,7 @@ NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _join_conjuncts.resize(p._join_conjuncts.size()); @@ -66,7 +66,7 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& } Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp b/be/src/pipeline/exec/olap_table_sink_operator.cpp index 999bf921d5..f5f9da0813 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.cpp +++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp @@ -31,7 +31,7 @@ OperatorPtr OlapTableSinkOperatorBuilder::build_operator() { Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit)); @@ -43,7 +43,7 @@ Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) { return Status::OK(); } SCOPED_TIMER(_close_timer); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); if (_closed) { return _close_status; } diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index 877a42937d..ad35f79728 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -54,7 +54,7 @@ public: : Base(parent, state) {}; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); return Base::open(state); } @@ -95,7 +95,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); return local_state.sink(state, in_block, source_state); } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 5abbecbef8..d0c3774a58 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -30,7 +30,7 @@ OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() { Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); auto& p = _parent->cast(); RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); _partition_expr_ctxs.resize(p._partition_expr_ctxs.size()); @@ -98,7 +98,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SourceState source_state) { auto& local_state = get_local_state(state); auto current_rows = input_block->rows(); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (current_rows > 0) { local_state.child_input_rows = local_state.child_input_rows + current_rows; if (UNLIKELY(_partition_exprs_num == 0)) { diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 84a20a2f64..dfbe1fd40f 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -31,7 +31,7 @@ OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() { Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); @@ -43,7 +43,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: SourceState& source_state) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); SCOPED_TIMER(local_state._get_next_timer); output_block->clear_column_data(); { diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index ce00746380..40f6f5d7b2 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -52,7 +52,7 @@ RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent) Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _expr_ctxs.resize(p._expr_ctxs.size()); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index ea765055cd..e572bfe93e 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -101,7 +101,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) { Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _sender_id = info.sender_id; @@ -153,7 +153,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i } Status ResultFileSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); return Base::open(state); } @@ -163,7 +163,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) return Status::OK(); } SCOPED_TIMER(_close_timer); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); auto& p = _parent->cast(); if (_closed) { @@ -264,7 +264,7 @@ void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelP Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); return local_state.sink(state, in_block, source_state); } diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 12c06794a9..b65a5ba1b8 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -52,7 +52,7 @@ bool ResultSinkOperator::can_write() { Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name); @@ -79,7 +79,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) } Status ResultSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); auto& p = _parent->cast(); @@ -139,7 +139,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } @@ -173,14 +173,12 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { } SCOPED_TIMER(_close_timer); COUNTER_UPDATE(_wait_for_queue_timer, _queue_dependency->write_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), _queue_dependency->write_watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _queue_dependency->write_watcher_elapse_time()); COUNTER_SET(_wait_for_buffer_timer, _buffer_dependency->write_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), - _buffer_dependency->write_watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _buffer_dependency->write_watcher_elapse_time()); COUNTER_SET(_wait_for_cancel_timer, _cancel_dependency->write_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), - _cancel_dependency->write_watcher_elapse_time()); - SCOPED_TIMER(profile()->total_time_counter()); + COUNTER_UPDATE(exec_time_counter(), _cancel_dependency->write_watcher_elapse_time()); + SCOPED_TIMER(exec_time_counter()); Status final_status = exec_status; if (_writer) { // close the writer diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 7187604525..075090916a 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -118,7 +118,7 @@ bool ScanLocalState::should_run_serial() const { template Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); @@ -167,7 +167,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) template Status ScanLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); if (_open_dependency == nullptr) { return Status::OK(); @@ -1344,21 +1344,18 @@ Status ScanLocalState::close(RuntimeState* state) { SCOPED_TIMER(_close_timer); if (_data_ready_dependency) { COUNTER_UPDATE(_wait_for_data_timer, _data_ready_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), - _data_ready_dependency->read_watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _data_ready_dependency->read_watcher_elapse_time()); } if (_eos_dependency) { COUNTER_SET(_wait_for_eos_timer, _eos_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), - _eos_dependency->read_watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _eos_dependency->read_watcher_elapse_time()); } if (_scanner_done_dependency) { COUNTER_SET(_wait_for_scanner_done_timer, _scanner_done_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(profile()->total_time_counter(), - _scanner_done_dependency->read_watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _scanner_done_dependency->read_watcher_elapse_time()); } - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); if (_scanner_ctx.get()) { _scanner_ctx->clear_and_join(reinterpret_cast(this), state); } @@ -1371,7 +1368,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: SourceState& source_state) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state._get_next_timer); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); // in inverted index apply logic, in order to optimize query performance, // we built some temporary columns into block, these columns only used in scan node level, // remove them when query leave scan node to avoid other nodes use block->columns() to make a wrong decision diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 430315b998..da16453c90 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -47,7 +47,7 @@ Status SchemaScanOperator::close(RuntimeState* state) { Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _scanner_param.common_param = p._common_scanner_param; @@ -212,7 +212,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); bool schema_eos = false; diff --git a/be/src/pipeline/exec/select_operator.h b/be/src/pipeline/exec/select_operator.h index 6c998c8ab3..d75c0a7848 100644 --- a/be/src/pipeline/exec/select_operator.h +++ b/be/src/pipeline/exec/select_operator.h @@ -61,7 +61,7 @@ public: Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 81f0c986b7..36503df2db 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -106,7 +106,7 @@ Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized SourceState source_state) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (_cur_child_id > 1) { @@ -141,7 +141,7 @@ Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized template Status SetProbeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); Parent& parent = _parent->cast(); static_cast(_dependency)->set_cur_child_id(parent._cur_child_id); diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 07205e6c8a..90cc792d47 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -56,7 +56,7 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); auto& mem_used = local_state._shared_state->mem_used; @@ -160,7 +160,7 @@ Status SetSinkOperatorX::_extract_build_column( template Status SetSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _build_timer = ADD_TIMER(_profile, "BuildTime"); diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index e679274d92..d3840285c3 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -63,7 +63,7 @@ Status SetSourceLocalState::init(RuntimeState* state, LocalStateIn template Status SetSourceLocalState::open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(PipelineXLocalState::open(state)); auto& child_exprs_lists = _shared_state->child_exprs_lists; @@ -94,7 +94,7 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, vectoriz SourceState& source_state) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); _create_mutable_cols(local_state, block); auto st = std::visit( [&](auto&& arg) -> Status { diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index cc41fbc4bf..5471be54e5 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -30,7 +30,7 @@ OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator) Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); @@ -61,7 +61,6 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _shared_state->sorter->init_profile(_profile); - SCOPED_TIMER(_profile->total_time_counter()); _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true"); _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); @@ -139,7 +138,7 @@ Status SortSinkOperatorX::open(RuntimeState* state) { Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 6732094272..e13e518f76 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -35,7 +35,7 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); bool eos = false; RETURN_IF_ERROR_OR_CATCH_EXCEPTION( local_state._shared_state->sorter->get_next(state, block, &eos)); diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 3d246ec9d9..9d8292c558 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -354,7 +354,7 @@ Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* sta Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); local_state._shared_state->input_num_rows += in_block->rows(); Status ret = Status::OK(); diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index 45d2e1902f..353f997ce7 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -81,7 +81,7 @@ StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, const Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (!local_state._shared_state->data_queue->data_exhausted()) { std::unique_ptr agg_block; DCHECK(local_state._dependency->read_blocked_by() == nullptr); diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 9b07fb6a83..db43869a77 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -96,7 +96,7 @@ Status UnionSinkOperator::close(RuntimeState* state) { Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _child_expr.resize(p._child_expr.size()); @@ -146,7 +146,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) { Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (local_state._output_block == nullptr) { local_state._output_block = diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 1f27ab920c..8e5179734c 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -119,7 +119,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { } RETURN_IF_ERROR(Base::init(state, info)); ss->data_queue.set_dependency(_dependency); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); // Const exprs materialized by this node. These exprs don't refer to any children. // Only materialized by the first fragment instance to avoid duplication. @@ -152,7 +152,7 @@ std::shared_ptr UnionSourceLocalState::create_shared_state() { Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._need_read_for_const_expr) { if (has_more_const(state)) { static_cast(get_next_const(state, block)); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index b69150ba51..eba75c0fc1 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -21,7 +21,7 @@ namespace doris::pipeline { Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); @@ -68,7 +68,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state, Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); { SCOPED_TIMER(local_state._compute_hash_value_timer); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 75d9b4ac47..127e14dba6 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -21,7 +21,7 @@ namespace doris::pipeline { Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _dependency->set_shared_state(info.local_exchange_state); _shared_state = (LocalExchangeSharedState*)_dependency->shared_state(); @@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); PartitionedBlock partitioned_block; std::unique_ptr mutable_block = nullptr; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index cc22c96deb..88b21b754f 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -175,7 +175,7 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { auto local_state = state->get_local_state(operator_id()); - SCOPED_TIMER(local_state->profile()->total_time_counter()); + SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); using namespace vectorized; vectorized::MutableBlock mutable_block = @@ -357,6 +357,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); + _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); _mem_tracker = std::make_unique("PipelineXLocalState:" + _runtime_profile->name()); _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage"); _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( @@ -406,6 +407,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); + _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique(_parent->get_name()); return Status::OK(); @@ -448,14 +450,14 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori return Status::OK(); } { - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR( push(state, local_state._child_block.get(), local_state._child_source_state)); } } if (!need_more_input_data(state)) { - SCOPED_TIMER(local_state.profile()->total_time_counter()); + SCOPED_TIMER(local_state.exec_time_counter()); SourceState new_state = SourceState::DEPEND_ON_SOURCE; RETURN_IF_ERROR(pull(state, block, new_state)); if (new_state == SourceState::FINISHED) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 79762e6ee1..d2f170f9f2 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -86,7 +86,7 @@ public: MemTracker* mem_tracker() { return _mem_tracker.get(); } RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; } RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } - + RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } @@ -120,6 +120,7 @@ protected: RuntimeProfile::Counter* _memory_used_counter; RuntimeProfile::Counter* _wait_for_finish_dependency_timer; RuntimeProfile::Counter* _projection_timer; + RuntimeProfile::Counter* _exec_timer; // Account for peak memory used by this node RuntimeProfile::Counter* _peak_memory_usage_counter; RuntimeProfile::Counter* _open_timer = nullptr; @@ -369,7 +370,7 @@ public: } RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } - + RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } virtual WriteDependency* dependency() { return nullptr; } FinishDependency* finishdependency() { return _finish_dependency.get(); } @@ -396,7 +397,7 @@ protected: RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer; RuntimeProfile::Counter* _wait_for_finish_dependency_timer; - + RuntimeProfile::Counter* _exec_timer; std::shared_ptr _finish_dependency; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 4e0602e807..5f5c5cb6c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -127,7 +127,11 @@ public class RuntimeProfile { } public Counter getCounterTotalTime() { - return counterTotalTime; + Counter totalTimeCounter = counterMap.get("TotalTime"); + if (totalTimeCounter == null) { + return counterTotalTime; + } + return totalTimeCounter; } public int nodeId() {