From a0de08255d78feac8d6067c7d7c8a1ba1fb7c655 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 23 Sep 2023 19:27:45 +0800 Subject: [PATCH] [pipelineX](profile) Add necessary metrics (#24820) --- be/src/pipeline/exec/multi_cast_data_stream_source.h | 2 +- be/src/pipeline/exec/partition_sort_source_operator.cpp | 3 +++ be/src/pipeline/exec/partition_sort_source_operator.h | 2 ++ be/src/pipeline/pipeline_x/operator.h | 5 +++++ 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index e9093cff58..aa20272d07 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -177,7 +177,6 @@ class MultiCastDataStreamSinkLocalState final ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} - std::shared_ptr multi_cast_data_streamer(); friend class MultiCastDataStreamSinkOperatorX; friend class DataSinkOperatorX; using Base = PipelineXSinkLocalState; @@ -209,6 +208,7 @@ public: SourceState source_state) override { CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); auto st = local_state._shared_state->_multi_cast_data_streamer->push( diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 52d3470413..25ea12e203 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -33,6 +33,8 @@ Status PartitionSortSourceLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_close_timer); _shared_state->previous_row = nullptr; _shared_state->partition_sorts.clear(); return PipelineXLocalState::close(state); @@ -42,6 +44,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: SourceState& source_state) { RETURN_IF_CANCELLED(state); CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); output_block->clear_column_data(); { std::lock_guard lock(local_state._shared_state->buffer_mutex); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 859e2d8b58..01abff61f4 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -61,6 +61,8 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); _shared_state->previous_row = std::make_unique(); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 92f2b0b9da..cb6c8f1e3a 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -698,6 +698,9 @@ public: _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id()); _writer->set_dependency(_async_writer_dependency.get()); + + _wait_for_dependency_timer = ADD_TIMER( + _profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time"); return Status::OK(); } @@ -717,6 +720,8 @@ public: if (_closed) { return Status::OK(); } + COUNTER_SET(_wait_for_dependency_timer, + _async_writer_dependency->write_watcher_elapse_time()); if (_writer->need_normal_close()) { if (exec_status.ok() && !state->is_cancelled()) { RETURN_IF_ERROR(_writer->commit_trans());