diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ae6e2944a7..9d731193c0 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -42,10 +42,8 @@ #include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/uid_util.h" -#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/join/vnested_loop_join_node.h" #include "vec/exec/scan/new_es_scan_node.h" @@ -72,7 +70,6 @@ #include "vec/exec/vunion_node.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -#include "vec/runtime/vdatetime_value.h" #include "vec/utils/util.hpp" namespace doris { diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 7b5cf26032..817c8e26d9 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -32,7 +32,6 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(DataGenOperator, SourceOperator) Status DataGenOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(SourceOperator::open(state)); return _node->open(state); } diff --git a/be/src/pipeline/exec/mysql_scan_operator.cpp b/be/src/pipeline/exec/mysql_scan_operator.cpp index e62502ae17..80afa8ef5e 100644 --- a/be/src/pipeline/exec/mysql_scan_operator.cpp +++ b/be/src/pipeline/exec/mysql_scan_operator.cpp @@ -24,7 +24,6 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(MysqlScanOperator, SourceOperator) Status MysqlScanOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(SourceOperator::open(state)); return _node->open(state); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 187f68f58d..3fd4e34eba 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -25,13 +25,7 @@ class RuntimeState; namespace doris::pipeline { OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder) - : _operator_builder(operator_builder), - _runtime_profile(new RuntimeProfile(fmt::format( - "{} (id={})", _operator_builder->get_name(), _operator_builder->id()))), - _mem_tracker(std::make_unique(get_name() + ": " + _runtime_profile->name(), - _runtime_profile.get(), nullptr, - "PeakMemoryUsage")), - _is_closed(false) {} + : _operator_builder(operator_builder), _is_closed(false) {} bool OperatorBase::is_sink() const { return _operator_builder->is_sink(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d2a40ab553..2ef903b0e4 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -235,13 +235,10 @@ public: bool is_closed() const { return _is_closed; } - MemTracker* mem_tracker() const { return _mem_tracker.get(); } - const OperatorBuilderBase* operator_builder() const { return _operator_builder; } const RowDescriptor& row_desc(); - RuntimeProfile* runtime_profile() { return _runtime_profile.get(); } virtual std::string debug_string() const; int32_t id() const { return _operator_builder->id(); } @@ -249,11 +246,6 @@ protected: OperatorBuilderBase* _operator_builder; OperatorPtr _child; - std::unique_ptr _runtime_profile; - std::unique_ptr _mem_tracker; - // TODO pipeline Account for peak memory used by this operator - RuntimeProfile::Counter* _memory_used_counter = nullptr; - bool _is_closed = false; }; @@ -273,15 +265,9 @@ public: ~DataSinkOperator() override = default; - Status prepare(RuntimeState* state) override { - _sink->profile()->insert_child_head(_runtime_profile.get(), true); - return Status::OK(); - } + Status prepare(RuntimeState* state) override { return Status::OK(); } - Status open(RuntimeState* state) override { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - return _sink->open(state); - } + Status open(RuntimeState* state) override { return _sink->open(state); } Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { @@ -300,7 +286,6 @@ public: if (is_closed()) { return Status::OK(); } - _fresh_exec_timer(_sink); RETURN_IF_ERROR(_sink->close(state, state->query_status())); _is_closed = true; return Status::OK(); @@ -309,11 +294,6 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } protected: - void _fresh_exec_timer(NodeType* node) { - node->profile()->total_time_counter()->update( - _runtime_profile->total_time_counter()->value()); - } - NodeType* _sink; }; @@ -332,21 +312,18 @@ public: ~StreamingOperator() override = default; Status prepare(RuntimeState* state) override { - _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); _node->increase_ref(); _use_projection = _node->has_output_row_descriptor(); return Status::OK(); } Status open(RuntimeState* state) override { - SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(_node->alloc_resource(state)); return Status::OK(); } Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { - SCOPED_TIMER(_runtime_profile->total_time_counter()); return _node->sink(state, in_block, source_state == SourceState::FINISHED); } @@ -354,7 +331,6 @@ public: if (is_closed()) { return Status::OK(); } - _fresh_exec_timer(_node); if (!_node->decrease_ref()) { _node->release_resource(state); } @@ -364,7 +340,6 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { - SCOPED_TIMER(_runtime_profile->total_time_counter()); DCHECK(_child); auto input_block = _use_projection ? _node->get_clear_input_block() : block; RETURN_IF_ERROR(_child->get_block(state, input_block, source_state)); @@ -382,11 +357,6 @@ public: bool can_read() override { return _node->can_read(); } protected: - void _fresh_exec_timer(NodeType* node) { - node->runtime_profile()->total_time_counter()->update( - _runtime_profile->total_time_counter()->value()); - } - NodeType* _node; bool _use_projection; }; @@ -404,7 +374,6 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { - SCOPED_TIMER(this->_runtime_profile->total_time_counter()); auto& node = StreamingOperator::_node; bool eos = false; RETURN_IF_ERROR(node->get_next_after_projects( @@ -439,7 +408,6 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { - SCOPED_TIMER(this->_runtime_profile->total_time_counter()); auto& node = StreamingOperator::_node; auto& child = StreamingOperator::_child; diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index c58d92392d..2eae24cd21 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -32,7 +32,6 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(SchemaScanOperator, SourceOperator) Status SchemaScanOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); return _node->open(state); } diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 29276f4112..3a9e502969 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -42,8 +42,8 @@ StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator Status StreamingAggSinkOperator::prepare(RuntimeState* state) { RETURN_IF_ERROR(StreamingOperator::prepare(state)); _queue_byte_size_counter = - ADD_COUNTER(_runtime_profile.get(), "MaxSizeInBlockQueue", TUnit::BYTES); - _queue_size_counter = ADD_COUNTER(_runtime_profile.get(), "MaxSizeOfBlockQueue", TUnit::UNIT); + ADD_COUNTER(_node->runtime_profile(), "MaxSizeInBlockQueue", TUnit::BYTES); + _queue_size_counter = ADD_COUNTER(_node->runtime_profile(), "MaxSizeOfBlockQueue", TUnit::UNIT); return Status::OK(); } @@ -54,7 +54,6 @@ bool StreamingAggSinkOperator::can_write() { Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); Status ret = Status::OK(); if (in_block && in_block->rows() > 0) { auto block_from_ctx = _data_queue->get_free_block(); diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index a293052232..c343253903 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -41,7 +41,6 @@ bool StreamingAggSourceOperator::can_read() { Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); bool eos = false; if (!_data_queue->data_exhausted()) { std::unique_ptr agg_block; diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 5c40081340..9296df78d9 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -49,7 +49,6 @@ OperatorPtr UnionSinkOperatorBuilder::build_operator() { Status UnionSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); if (_output_block == nullptr) { _output_block = _data_queue->get_free_block(_cur_child_id); }