From 28a2e7108426e214bf985d793c735d2fbaf4c348 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 28 Aug 2023 14:38:07 +0800 Subject: [PATCH] [pipelineX](refactor) refine codes (#23521) * [pipelineX](refactor) refine codes * update * update --- .../exec/aggregation_sink_operator.cpp | 2 +- .../pipeline/exec/analytic_sink_operator.cpp | 2 -- .../pipeline/exec/exchange_sink_operator.cpp | 1 - .../exec/exchange_source_operator.cpp | 4 --- be/src/pipeline/exec/operator.cpp | 2 ++ be/src/pipeline/exec/operator.h | 1 - be/src/pipeline/exec/result_sink_operator.cpp | 5 +-- be/src/pipeline/exec/result_sink_operator.h | 1 - be/src/pipeline/exec/scan_operator.cpp | 34 ++++++------------- be/src/pipeline/exec/sort_sink_operator.cpp | 2 +- 10 files changed, 16 insertions(+), 38 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a20522f149..97b4a6ee03 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -59,6 +59,7 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* st _max_row_size_counter(nullptr) {} Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); _dependency = (AggDependency*)info.dependency; _shared_state = (AggSharedState*)_dependency->shared_state(); _agg_data = _shared_state->agg_data.get(); @@ -78,7 +79,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { for (size_t i = 0; i < _shared_state->probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _shared_state->probe_expr_ctxs[i])); } - _profile = p._pool->add(new RuntimeProfile("AggSinkLocalState")); _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); _hash_table_memory_usage = ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage"); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 1ebe342e85..2d38040e7f 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -28,14 +28,12 @@ OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator) Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); - _mem_tracker = std::make_unique("ExchangeSinkLocalState:"); auto& p = _parent->cast(); _dependency = (AnalyticDependency*)info.dependency; _shared_state = (AnalyticSharedState*)_dependency->shared_state(); _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - _profile = state->obj_pool()->add(new RuntimeProfile("AnalyticSinkLocalState")); _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage"); _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 5168916be6..ba04caae9b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -127,7 +127,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf instances.emplace_back(channel->get_fragment_instance_id_str()); } std::string title = "VDataStreamSender (dst_id={}, dst_fragments=[{}])"; - _profile = p._pool->add(new RuntimeProfile(title)); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index dadf70f0de..34654bed83 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -44,14 +44,10 @@ ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* paren : PipelineXLocalState(state, parent), num_rows_skipped(0), is_ready(false) {} Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { - if (_init) { - return Status::OK(); - } RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); stream_recvr = info.recvr; RETURN_IF_ERROR(_parent->cast()._vsort_exec_exprs.clone( state, vsort_exec_exprs)); - _init = true; return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 658df5f3cd..edc7b86247 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -63,6 +63,8 @@ std::string OperatorBase::debug_string() const { } Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name())); _mem_tracker = std::make_unique(_parent->get_name()); return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2fb484afc0..fd4e5604d9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -576,7 +576,6 @@ protected: RuntimeState* _state; vectorized::VExprContextSPtrs _conjuncts; vectorized::VExprContextSPtrs _projections; - bool _init = false; bool _closed = false; vectorized::Block _origin_block; }; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index ce8a933d7e..a9a115d6d5 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -51,12 +51,9 @@ bool ResultSinkOperator::can_write() { } Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); auto& p = _parent->cast(); auto fragment_instance_id = state->fragment_instance_id(); - auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})", - fragment_instance_id.hi, fragment_instance_id.lo); - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(title)); // create sender _sender = info.sender; _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index e98bae86e7..dc49c7c5ad 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -59,7 +59,6 @@ private: std::shared_ptr _sender; std::shared_ptr _writer; - RuntimeProfile* _profile; // Allocated from _pool }; class ResultSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 236421ae13..bcae613ffe 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -110,15 +110,9 @@ bool ScanLocalState::should_run_serial() const { } Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { - if (_init) { - return Status::OK(); - } - - auto& p = _parent->cast(); - - set_scan_ranges(info.scan_ranges); - RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); + auto& p = _parent->cast(); + set_scan_ranges(info.scan_ranges); _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { RETURN_IF_ERROR( @@ -154,9 +148,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } - RETURN_IF_ERROR(status); - _init = true; - return Status::OK(); + return status; } Status ScanLocalState::_normalize_conjuncts() { @@ -1221,21 +1213,17 @@ ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const Des bool ScanOperatorX::can_read(RuntimeState* state) { auto& local_state = state->get_local_state(id())->cast(); - if (!local_state._init) { + if (local_state._eos || local_state._scanner_ctx->done()) { + // _eos: need eos + // _scanner_ctx->done(): need finish + // _scanner_ctx->no_schedule(): should schedule _scanner_ctx return true; } else { - if (local_state._eos || local_state._scanner_ctx->done()) { - // _eos: need eos - // _scanner_ctx->done(): need finish - // _scanner_ctx->no_schedule(): should schedule _scanner_ctx - return true; - } else { - if (local_state._scanner_ctx->get_num_running_scanners() == 0 && - local_state._scanner_ctx->has_enough_space_in_blocks_queue()) { - local_state._scanner_ctx->reschedule_scanner_ctx(); - } - return local_state.ready_to_read(); // there are some blocks to process + if (local_state._scanner_ctx->get_num_running_scanners() == 0 && + local_state._scanner_ctx->has_enough_space_in_blocks_queue()) { + local_state._scanner_ctx->reschedule_scanner_ctx(); } + return local_state.ready_to_read(); // there are some blocks to process } } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index af655b896c..059b1a16e8 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -29,12 +29,12 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator) Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); auto& p = _parent->cast(); _dependency = (SortDependency*)info.dependency; _shared_state = (SortSharedState*)_dependency->shared_state(); RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); - _profile = p._pool->add(new RuntimeProfile("SortSinkLocalState")); switch (p._algorithm) { case SortAlgorithm::HEAP_SORT: { _shared_state->sorter = vectorized::HeapSorter::create_unique(