From b30cd86e9e3ec16dbead0c0fa31814cb6d32514f Mon Sep 17 00:00:00 2001 From: HappenLee Date: Mon, 5 Dec 2022 18:35:00 +0800 Subject: [PATCH] [Refactor](pipeline) Refactor operator and builder code of pipeline (#14787) --- be/src/exec/exec_node.cpp | 9 - be/src/exec/exec_node.h | 16 +- be/src/pipeline/CMakeLists.txt | 5 +- .../exec/aggregation_sink_operator.cpp | 53 +--- .../pipeline/exec/aggregation_sink_operator.h | 36 +-- .../exec/aggregation_source_operator.cpp | 41 +-- .../exec/aggregation_source_operator.h | 22 +- be/src/pipeline/exec/empty_set_operator.cpp | 20 +- be/src/pipeline/exec/empty_set_operator.h | 31 +- .../pipeline/exec/exchange_sink_operator.cpp | 58 ++-- be/src/pipeline/exec/exchange_sink_operator.h | 42 +-- .../exec/exchange_source_operator.cpp | 31 +- .../pipeline/exec/exchange_source_operator.h | 31 +- be/src/pipeline/exec/olap_scan_operator.h | 46 --- be/src/pipeline/exec/operator.cpp | 39 +-- be/src/pipeline/exec/operator.h | 300 ++++++++++++++---- be/src/pipeline/exec/repeat_operator.cpp | 56 +--- be/src/pipeline/exec/repeat_operator.h | 29 +- be/src/pipeline/exec/result_sink_operator.cpp | 45 +-- be/src/pipeline/exec/result_sink_operator.h | 37 +-- be/src/pipeline/exec/scan_operator.cpp | 30 +- be/src/pipeline/exec/scan_operator.h | 25 +- be/src/pipeline/exec/sort_sink_operator.cpp | 27 +- be/src/pipeline/exec/sort_sink_operator.h | 37 +-- be/src/pipeline/exec/sort_source_operator.cpp | 30 +- be/src/pipeline/exec/sort_source_operator.h | 35 +- .../streaming_aggregation_sink_operator.cpp | 37 +-- .../streaming_aggregation_sink_operator.h | 41 +-- .../streaming_aggregation_source_operator.cpp | 38 +-- .../streaming_aggregation_source_operator.h | 30 +- ...erator.cpp => table_function_operator.cpp} | 15 +- .../pipeline/exec/table_function_operator.h | 68 +--- be/src/pipeline/pipeline.cpp | 1 - be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 59 ++-- be/src/pipeline/task_scheduler.h | 1 - be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 - be/src/vec/exec/scan/vscan_node.h | 2 + be/src/vec/exec/vaggregation_node.h | 4 +- be/src/vec/sink/vdata_stream_sender.h | 2 + be/src/vec/sink/vresult_sink.h | 2 + 41 files changed, 481 insertions(+), 953 deletions(-) delete mode 100644 be/src/pipeline/exec/olap_scan_operator.h rename be/src/pipeline/exec/{olap_scan_operator.cpp => table_function_operator.cpp} (57%) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 14b8690d5d..cfbc9a50ca 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -861,15 +861,6 @@ Status ExecNode::get_next_after_projects(RuntimeState* state, vectorized::Block* return get_next(state, block, eos); } -Status ExecNode::execute(RuntimeState* state, vectorized::Block* input_block, - vectorized::Block* output_block, bool* eos) { - return Status::NotSupported("{} not implements execute", get_name()); -} - -Status ExecNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { - return Status::NotSupported("{} not implements pull", get_name()); -} - Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { return Status::NotSupported("{} not implements sink", get_name()); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 4d02c7df45..eaf367f6b9 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -55,7 +55,7 @@ class VExpr; namespace pipeline { class PipelineFragmentContext; class Pipeline; -class Operator; +class OperatorBase; } // namespace pipeline using std::string; @@ -118,14 +118,11 @@ public: // new interface to compatible new optimizers in FE Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos); - // Process data - // Eg: Projection, Union All, HashProbe - virtual Status execute(RuntimeState* state, vectorized::Block* input_block, - vectorized::Block* output_block, bool* eos); - // Emit data, both need impl with method: sink - // Eg: Aggregation, Sort - virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos); + // Eg: Aggregation, Sort, Scan + virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { + return get_next(state, output_block, eos); + } virtual Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) { return Status::OK(); @@ -247,7 +244,6 @@ public: protected: friend class DataSink; - friend class doris::pipeline::Operator; /// Initialize 'buffer_pool_client_' and claim the initial reservation for this /// ExecNode. Only needs to be called by ExecNodes that will use the client. @@ -411,7 +407,7 @@ protected: std::atomic _can_read = false; private: - friend class pipeline::Operator; + friend class pipeline::OperatorBase; bool _is_closed; bool _is_resource_released = false; std::atomic_int _ref; // used by pipeline operator to release resource. diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 71060233ef..436d4e70f2 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -28,7 +28,6 @@ set(PIPELINE_FILES task_scheduler.cpp exec/operator.cpp exec/scan_operator.cpp - exec/olap_scan_operator.cpp exec/empty_set_operator.cpp exec/exchange_source_operator.cpp exec/exchange_sink_operator.cpp @@ -41,7 +40,9 @@ set(PIPELINE_FILES exec/agg_context.cpp exec/sort_source_operator.cpp exec/sort_sink_operator.cpp - exec/repeat_operator.cpp) + exec/repeat_operator.cpp + exec/table_function_operator.cpp + ) add_library(Pipeline STATIC ${PIPELINE_FILES} diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index d9459bf56d..ae1205ce1a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -21,57 +21,6 @@ namespace doris::pipeline { -AggSinkOperator::AggSinkOperator(AggSinkOperatorBuilder* operator_builder, - vectorized::AggregationNode* agg_node) - : Operator(operator_builder), _agg_node(agg_node) {} +OPERATOR_CODE_GENERATOR(AggSinkOperator, Operator) -Status AggSinkOperator::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Operator::prepare(state)); - _agg_node->increase_ref(); - return Status::OK(); -} - -Status AggSinkOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(Operator::open(state)); - RETURN_IF_ERROR(_agg_node->alloc_resource(state)); - return Status::OK(); -} - -bool AggSinkOperator::can_write() { - return true; -} - -Status AggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - return _agg_node->sink(state, in_block, source_state == SourceState::FINISHED); -} - -Status AggSinkOperator::close(RuntimeState* state) { - _fresh_exec_timer(_agg_node); - if (!_agg_node->decrease_ref()) { - _agg_node->release_resource(state); - } - return Status::OK(); -} - -/////////////////////////////// operator template //////////////////////////////// - -AggSinkOperatorBuilder::AggSinkOperatorBuilder(int32_t id, const std::string& name, - vectorized::AggregationNode* exec_node) - : OperatorBuilder(id, name, exec_node), _agg_node(exec_node) {} - -OperatorPtr AggSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _agg_node); -} - -// use final aggregation source operator -bool AggSinkOperatorBuilder::is_sink() const { - return true; -} - -bool AggSinkOperatorBuilder::is_source() const { - return false; -} } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index fc74f2366d..21d9ad7d20 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -28,37 +28,19 @@ class Block; } // namespace vectorized namespace pipeline { -class AggSinkOperatorBuilder; -class AggSinkOperator : public Operator { + +class AggSinkOperatorBuilder final : public OperatorBuilder { public: - AggSinkOperator(AggSinkOperatorBuilder* operator_builder, vectorized::AggregationNode*); - - Status prepare(RuntimeState*) override; - Status open(RuntimeState* state) override; - - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; - - bool can_write() override; - - Status close(RuntimeState* state) override; - - Status finalize(doris::RuntimeState* state) override { return Status::OK(); } - -private: - vectorized::AggregationNode* _agg_node; -}; - -class AggSinkOperatorBuilder : public OperatorBuilder { -public: - AggSinkOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*); + AggSinkOperatorBuilder(int32_t, ExecNode*); OperatorPtr build_operator() override; + bool is_sink() const override { return true; }; +}; - bool is_sink() const override; - bool is_source() const override; - -private: - vectorized::AggregationNode* _agg_node; +class AggSinkOperator final : public Operator { +public: + AggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node); + bool can_write() override { return true; }; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index a0dc95615f..daecafdd0e 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -22,46 +22,7 @@ namespace doris { namespace pipeline { -AggregationSourceOperator::AggregationSourceOperator(OperatorBuilder* templ, - vectorized::AggregationNode* node) - : Operator(templ), _agg_node(node) {} - -Status AggregationSourceOperator::prepare(RuntimeState* state) { - _agg_node->increase_ref(); - return Status::OK(); -} - -bool AggregationSourceOperator::can_read() { - return _agg_node->can_read(); -} - -Status AggregationSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - bool eos = false; - RETURN_IF_ERROR(_agg_node->pull(state, block, &eos)); - source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; - return Status::OK(); -} - -Status AggregationSourceOperator::close(RuntimeState* state) { - _fresh_exec_timer(_agg_node); - if (!_agg_node->decrease_ref()) { - _agg_node->release_resource(state); - } - return Status::OK(); -} - -/////////////////////////////// operator template //////////////////////////////// - -AggregationSourceOperatorBuilder::AggregationSourceOperatorBuilder( - int32_t id, const std::string& name, vectorized::AggregationNode* exec_node) - : OperatorBuilder(id, name, exec_node) {} - -OperatorPtr AggregationSourceOperatorBuilder::build_operator() { - return std::make_shared( - this, assert_cast(_related_exec_node)); -} +OPERATOR_CODE_GENERATOR(AggSourceOperator, Operator) } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 1c611d163d..87e8f2bb11 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -25,27 +25,19 @@ class AggregationNode; namespace pipeline { -// For read none streaming agg sink operator's data -class AggregationSourceOperator : public Operator { +class AggSourceOperatorBuilder final : public OperatorBuilder { public: - AggregationSourceOperator(OperatorBuilder*, vectorized::AggregationNode*); - Status prepare(RuntimeState* state) override; - bool can_read() override; - Status close(RuntimeState* state) override; - Status get_block(RuntimeState*, vectorized::Block*, SourceState&) override; - -private: - vectorized::AggregationNode* _agg_node; -}; - -class AggregationSourceOperatorBuilder : public OperatorBuilder { -public: - AggregationSourceOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*); + AggSourceOperatorBuilder(int32_t, ExecNode*); bool is_source() const override { return true; } OperatorPtr build_operator() override; }; +class AggSourceOperator final : public Operator { +public: + AggSourceOperator(OperatorBuilderBase*, ExecNode*); +}; + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/empty_set_operator.cpp b/be/src/pipeline/exec/empty_set_operator.cpp index a0913766cd..2f607080d5 100644 --- a/be/src/pipeline/exec/empty_set_operator.cpp +++ b/be/src/pipeline/exec/empty_set_operator.cpp @@ -21,24 +21,6 @@ namespace doris::pipeline { -EmptySetSourceOperator::EmptySetSourceOperator(EmptySetSourceOperatorBuilder* operator_builder, - vectorized::VEmptySetNode* empty_set_node) - : Operator(operator_builder), _empty_set_node(empty_set_node) {} - -bool EmptySetSourceOperator::can_read() { - return true; -} - -Status EmptySetSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - bool eos = false; - RETURN_IF_ERROR(_empty_set_node->get_next(state, block, &eos)); - source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; - return Status::OK(); -} - -EmptySetSourceOperatorBuilder::EmptySetSourceOperatorBuilder( - int32_t id, const string& name, vectorized::VEmptySetNode* empty_set_node) - : OperatorBuilder(id, name, empty_set_node), _empty_set_node(empty_set_node) {} +OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, Operator) } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h index 90af89dbcb..a0a1998971 100644 --- a/be/src/pipeline/exec/empty_set_operator.h +++ b/be/src/pipeline/exec/empty_set_operator.h @@ -27,34 +27,19 @@ class VEmptySetNode; namespace pipeline { -class EmptySetSourceOperatorBuilder; - -class EmptySetSourceOperator : public Operator { +class EmptySetSourceOperatorBuilder final : public OperatorBuilder { public: - EmptySetSourceOperator(EmptySetSourceOperatorBuilder* operator_builder, - vectorized::VEmptySetNode* empty_set_node); - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; - - bool can_read() override; - -private: - vectorized::VEmptySetNode* _empty_set_node; -}; - -class EmptySetSourceOperatorBuilder : public OperatorBuilder { -public: - EmptySetSourceOperatorBuilder(int32_t id, const std::string& name, - vectorized::VEmptySetNode* empty_set_node); + EmptySetSourceOperatorBuilder(int32_t id, ExecNode* empty_set_node); bool is_source() const override { return true; } - OperatorPtr build_operator() override { - return std::make_shared(this, _empty_set_node); - } + OperatorPtr build_operator() override; +}; -private: - vectorized::VEmptySetNode* _empty_set_node; +class EmptySetSourceOperator final : public Operator { +public: + EmptySetSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* empty_set_node); + bool can_read() override { return true; }; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 7179f11d30..6c0916baf1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -27,70 +27,48 @@ namespace doris::pipeline { -ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilder* operator_builder, - vectorized::VDataStreamSender* sink, - PipelineFragmentContext* context) - : Operator(operator_builder), _sink(sink), _context(context) {} +ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, + PipelineFragmentContext* context) + : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), _context(context) {} -ExchangeSinkOperator::~ExchangeSinkOperator() = default; - -Status ExchangeSinkOperator::init(ExecNode* exec_node, RuntimeState* state) { - RETURN_IF_ERROR(Operator::init(exec_node, state)); - _state = state; - return Status::OK(); +OperatorPtr ExchangeSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink, _context); } +ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink, + PipelineFragmentContext* context) + : DataSinkOperator(operator_builder, sink), _context(context) {} + Status ExchangeSinkOperator::init(const TDataSink& tsink) { RETURN_IF_ERROR(_sink->init(tsink)); - - PUniqueId query_id; - query_id.set_hi(_state->query_id().hi); - query_id.set_lo(_state->query_id().lo); - _sink_buffer = - std::make_unique(query_id, tsink.stream_sink.dest_node_id, - _sink->_sender_id, _state->be_number(), _context); + _dest_node_id = tsink.stream_sink.dest_node_id; return Status::OK(); } Status ExchangeSinkOperator::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Operator::prepare(state)); - RETURN_IF_ERROR(_sink->prepare(state)); - _sink->profile()->add_child(_runtime_profile.get(), true, nullptr); + _state = state; + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + _sink_buffer = std::make_unique(id, _dest_node_id, _sink->_sender_id, + _state->be_number(), _context); + RETURN_IF_ERROR(DataSinkOperator::prepare(state)); _sink->registe_channels(_sink_buffer.get()); return Status::OK(); } -Status ExchangeSinkOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(_sink->open(state)); - return Status::OK(); -} - bool ExchangeSinkOperator::can_write() { return _sink_buffer->can_write() && _sink->channel_all_can_write(); } -Status ExchangeSinkOperator::finalize(RuntimeState* state) { - Status result = Status::OK(); - RETURN_IF_ERROR(_sink->close(state, result)); - return result; -} - -Status ExchangeSinkOperator::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(_sink->send(state, block, source_state == SourceState::FINISHED)); - return Status::OK(); -} - bool ExchangeSinkOperator::is_pending_finish() const { return _sink_buffer->is_pending_finish(); } Status ExchangeSinkOperator::close(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperator::close(state)); _sink_buffer->close(); - RETURN_IF_ERROR(Operator::close(state)); return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 481baf1868..c35e30f49b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -26,21 +26,27 @@ namespace doris { namespace pipeline { class PipelineFragmentContext; -// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker. -class ExchangeSinkOperator : public Operator { +class ExchangeSinkOperatorBuilder final + : public DataSinkOperatorBuilder { public: - ExchangeSinkOperator(OperatorBuilder* operator_builder, vectorized::VDataStreamSender* sink, + ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context); + + OperatorPtr build_operator() override; + +private: + PipelineFragmentContext* _context; +}; + +// Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker. +class ExchangeSinkOperator final : public DataSinkOperator { +public: + ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink, PipelineFragmentContext* context); - ~ExchangeSinkOperator() override; - Status init(ExecNode* exec_node, RuntimeState* state = nullptr) override; Status init(const TDataSink& tsink) override; Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; bool can_write() override; - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; bool is_pending_finish() const override; - Status finalize(RuntimeState* state) override; Status close(RuntimeState* state) override; @@ -48,28 +54,10 @@ public: private: std::unique_ptr _sink_buffer; - vectorized::VDataStreamSender* _sink; + int _dest_node_id = -1; RuntimeState* _state = nullptr; PipelineFragmentContext* _context; }; -class ExchangeSinkOperatorBuilder : public OperatorBuilder { -public: - ExchangeSinkOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node, - vectorized::VDataStreamSender* sink, - PipelineFragmentContext* context) - : OperatorBuilder(id, name, exec_node), _sink(sink), _context(context) {} - - bool is_sink() const override { return true; } - - OperatorPtr build_operator() override { - return std::make_shared(this, _sink, _context); - } - -private: - vectorized::VDataStreamSender* _sink; - PipelineFragmentContext* _context; -}; - } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 746d87e107..31bed810cb 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -23,41 +23,14 @@ namespace doris::pipeline { -ExchangeSourceOperator::ExchangeSourceOperator(OperatorBuilder* operator_builder, - vectorized::VExchangeNode* node) - : Operator(operator_builder), _exchange_node(node) {} - -Status ExchangeSourceOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - return _exchange_node->alloc_resource(state); -} +OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, Operator) bool ExchangeSourceOperator::can_read() { - return _exchange_node->_stream_recvr->ready_to_read(); -} - -Status ExchangeSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - bool eos = false; - auto st = _exchange_node->get_next(state, block, &eos); - source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; - return st; + return _node->_stream_recvr->ready_to_read(); } bool ExchangeSourceOperator::is_pending_finish() const { // TODO HappenLee return false; } - -Status ExchangeSourceOperator::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - _fresh_exec_timer(_exchange_node); - _exchange_node->release_resource(state); - - return Operator::close(state); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 8d1707a1e0..273d6664c5 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -25,31 +25,20 @@ class VExchangeNode; namespace doris::pipeline { -class ExchangeSourceOperator : public Operator { +class ExchangeSourceOperatorBuilder final : public OperatorBuilder { public: - explicit ExchangeSourceOperator(OperatorBuilder*, vectorized::VExchangeNode*); - Status open(RuntimeState* state) override; - bool can_read() override; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; - bool is_pending_finish() const override; - Status close(RuntimeState* state) override; - -private: - vectorized::VExchangeNode* _exchange_node; -}; - -class ExchangeSourceOperatorBuilder : public OperatorBuilder { -public: - ExchangeSourceOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node) - : OperatorBuilder(id, name, exec_node) {} + ExchangeSourceOperatorBuilder(int32_t id, ExecNode* exec_node); bool is_source() const override { return true; } - OperatorPtr build_operator() override { - return std::make_shared( - this, reinterpret_cast(_related_exec_node)); - } + OperatorPtr build_operator() override; +}; + +class ExchangeSourceOperator final : public Operator { +public: + ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*); + bool can_read() override; + bool is_pending_finish() const override; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h deleted file mode 100644 index 26631c6390..0000000000 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include "scan_operator.h" -#include "vec/exec/scan/new_olap_scan_node.h" - -namespace doris::pipeline { - -class OlapScanOperatorBuilder; -class OlapScanOperator : public ScanOperator { -public: - OlapScanOperator(OperatorBuilder* operator_builder, vectorized::NewOlapScanNode* scan_node); -}; - -class OlapScanOperatorBuilder : public ScanOperatorBuilder { -public: - OlapScanOperatorBuilder(uint32_t id, const std::string& name, - vectorized::NewOlapScanNode* new_olap_scan_node); - - OperatorPtr build_operator() override { - return std::make_shared(this, _new_olap_scan_node); - } - -private: - vectorized::NewOlapScanNode* _new_olap_scan_node; -}; - -} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 572cd1ffcc..770937c8bd 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -19,36 +19,18 @@ namespace doris::pipeline { -Operator::Operator(OperatorBuilder* operator_builder) +OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder) : _operator_builder(operator_builder), _is_closed(false) {} -bool Operator::is_sink() const { +bool OperatorBase::is_sink() const { return _operator_builder->is_sink(); } -bool Operator::is_source() const { +bool OperatorBase::is_source() const { return _operator_builder->is_source(); } -Status Operator::init(ExecNode* exec_node, RuntimeState* state) { - _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); - if (exec_node) { - exec_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); - } - return Status::OK(); -} - -Status Operator::prepare(RuntimeState* state) { - _mem_tracker = std::make_unique("Operator:" + _runtime_profile->name(), - _runtime_profile.get()); - return Status::OK(); -} - -Status Operator::open(RuntimeState* state) { - return Status::OK(); -} - -Status Operator::close(RuntimeState* state) { +Status OperatorBase::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } @@ -56,16 +38,11 @@ Status Operator::close(RuntimeState* state) { return Status::OK(); } -const RowDescriptor& Operator::row_desc() { +const RowDescriptor& OperatorBase::row_desc() { return _operator_builder->row_desc(); } -void Operator::_fresh_exec_timer(doris::ExecNode* node) { - node->_runtime_profile->total_time_counter()->update( - _runtime_profile->total_time_counter()->value()); -} - -std::string Operator::debug_string() const { +std::string OperatorBase::debug_string() const { std::stringstream ss; ss << _operator_builder->get_name() << ", source: " << is_source(); ss << ", sink: " << is_sink() << ", is closed: " << _is_closed; @@ -75,13 +52,13 @@ std::string Operator::debug_string() const { /////////////////////////////////////// OperatorBuilder //////////////////////////////////////////////////////////// -Status OperatorBuilder::prepare(doris::RuntimeState* state) { +Status OperatorBuilderBase::prepare(doris::RuntimeState* state) { _state = state; // runtime filter, now dispose by NewOlapScanNode return Status::OK(); } -void OperatorBuilder::close(doris::RuntimeState* state) { +void OperatorBuilderBase::close(doris::RuntimeState* state) { if (_is_closed) { return; } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d65d0291b3..d821103aa1 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -24,6 +24,15 @@ #include "runtime/runtime_state.h" #include "vec/core/block.h" +#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \ + NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \ + : OperatorBuilder(id, #NAME, exec_node) {} \ + \ + OperatorPtr NAME##Builder::build_operator() { return std::make_shared(this, _node); } \ + \ + NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \ + : SUBCLASS(operator_builder, node) {}; + namespace doris::pipeline { // Result of source pull data, init state is DEPEND_ON_SOURCE @@ -33,7 +42,6 @@ enum class SourceState : uint8_t { FINISHED = 2 }; -// enum class SinkState : uint8_t { SINK_IDLE = 0, // can send block to sink SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block @@ -41,16 +49,86 @@ enum class SinkState : uint8_t { }; //////////////// DO NOT USE THE UP State //////////////// -class OperatorBuilder; -class Operator; +class OperatorBuilderBase; +class OperatorBase; -using OperatorPtr = std::shared_ptr; +using OperatorPtr = std::shared_ptr; using Operators = std::vector; -class Operator { +using OperatorBuilderPtr = std::shared_ptr; +using OperatorBuilders = std::vector; + +class OperatorBuilderBase { public: - explicit Operator(OperatorBuilder* operator_builder); - virtual ~Operator() = default; + OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {} + + virtual ~OperatorBuilderBase() = default; + + virtual OperatorPtr build_operator() = 0; + + virtual bool is_sink() const { return false; } + virtual bool is_source() const { return false; } + + // create the object used by all operator + virtual Status prepare(RuntimeState* state); + + // destory the object used by all operator + virtual void close(RuntimeState* state); + + std::string get_name() const { return _name; } + + RuntimeState* runtime_state() { return _state; } + + virtual const RowDescriptor& row_desc() = 0; + + int32_t id() const { return _id; } + +protected: + const int32_t _id; + const std::string _name; + + RuntimeState* _state = nullptr; + bool _is_closed = false; +}; + +template +class OperatorBuilder : public OperatorBuilderBase { +public: + OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) + : OperatorBuilderBase(id, name), _node(reinterpret_cast(exec_node)) {} + + virtual ~OperatorBuilder() = default; + + const RowDescriptor& row_desc() override { return _node->row_desc(); } + + NodeType* exec_node() const { return _node; } + +protected: + NodeType* _node; +}; + +template +class DataSinkOperatorBuilder : public OperatorBuilderBase { +public: + DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr) + : OperatorBuilderBase(id, name), _sink(reinterpret_cast(sink)) {} + + virtual ~DataSinkOperatorBuilder() = default; + + bool is_sink() const override { return true; } + + const RowDescriptor& row_desc() override { return _sink->row_desc(); } + + SinkType* exec_node() const { return _sink; } + +protected: + SinkType* _sink; +}; + +class OperatorBase { +public: + explicit OperatorBase(OperatorBuilderBase* operator_builder); + virtual ~OperatorBase() = default; // After both sink and source need to know the cancel state. // do cancel work @@ -58,14 +136,11 @@ public: bool is_source() const; - // Should be call after ExecNode is constructed - virtual Status init(ExecNode* exec_node, RuntimeState* state = nullptr); - // Only result sink and data stream sink need to impl the virtual function virtual Status init(const TDataSink& tsink) { return Status::OK(); }; // Do prepare some state of Operator - virtual Status prepare(RuntimeState* state); + virtual Status prepare(RuntimeState* state) = 0; // Like ExecNode,when pipeline task first time be scheduled, can't block // the pipeline should be open after dependencies is finish @@ -73,15 +148,15 @@ public: // Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism, // but if one pipeline have multi task to parallel work, need to rethink the logic // - // Each operator should call open_self() to prepare resource to do data compute. - // if ExecNode split to sink and source operator, open_self() should be called in sink operator - virtual Status open(RuntimeState* state); + // Each operator should call alloc_resource() to prepare resource to do data compute. + // if ExecNode split to sink and source operator, alloc_resource() should be called in sink operator + virtual Status open(RuntimeState* state) = 0; // Release the resource, should not block the thread // // Each operator should call close_self() to release resource // if ExecNode split to sink and source operator, close_self() should be called in source operator - virtual Status close(RuntimeState* state); + virtual Status close(RuntimeState* state) = 0; Status set_child(OperatorPtr child) { if (is_source()) { @@ -96,20 +171,14 @@ public: virtual bool can_write() { return false; } // for sink // for pipeline - virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state, - [[maybe_unused]] vectorized::Block* block, - [[maybe_unused]] SourceState& result_state) { - std::stringstream error_msg; - error_msg << " has not implements get_block"; - return Status::NotSupported(error_msg.str()); - } + virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block, + SourceState& result_state) { + return Status::OK(); + }; // return can write continue - virtual Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { - std::stringstream error_msg; - error_msg << " not a sink "; - return Status::NotSupported(error_msg.str()); - } + virtual Status sink(RuntimeState* state, vectorized::Block* block, + SourceState source_state) = 0; virtual Status finalize(RuntimeState* state) { std::stringstream error_msg; @@ -130,7 +199,7 @@ public: MemTracker* mem_tracker() const { return _mem_tracker.get(); } - const OperatorBuilder* operator_builder() const { return _operator_builder; } + const OperatorBuilderBase* operator_builder() const { return _operator_builder; } const RowDescriptor& row_desc(); @@ -138,11 +207,9 @@ public: std::string debug_string() const; protected: - void _fresh_exec_timer(ExecNode* node); - std::unique_ptr _mem_tracker; - OperatorBuilder* _operator_builder; + OperatorBuilderBase* _operator_builder; // source has no child // if an operator is not source, it will get data from its child. OperatorPtr _child; @@ -155,44 +222,163 @@ private: bool _is_closed = false; }; -class OperatorBuilder { +template +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } - - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } - - ExecNode* exec_node() const { return _related_exec_node; } - - int32_t id() const { return _id; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } protected: - const int32_t _id; - const std::string _name; - ExecNode* _related_exec_node; + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - RuntimeState* _state = nullptr; - bool _is_closed = false; + NodeType* _sink; }; -using OperatorBuilderPtr = std::shared_ptr; -using OperatorBuilders = std::vector; +template +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + bool eos = false; + RETURN_IF_ERROR(_node->pull(state, block, &eos)); + source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); + } + + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } + + virtual bool can_read() override { return _node->can_read(); } + +protected: + void _fresh_exec_timer(NodeType* node) { + node->runtime_profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } + + NodeType* _node; +}; + +template +class DataStateOperator : public Operator { +public: + using NodeType = + std::remove_pointer_t().exec_node())>; + + DataStateOperator(OperatorBuilderBase* builder, ExecNode* node) + : Operator(builder, node), + _child_block(new vectorized::Block), + _child_source_state(SourceState::DEPEND_ON_SOURCE) {}; + + virtual ~DataStateOperator() = default; + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + auto& node = Operator::_node; + auto& child = Operator::_child; + + if (node->need_more_input_data()) { + RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state)); + source_state = _child_source_state; + if (_child_block->rows() == 0) { + return Status::OK(); + } + node->push(state, _child_block.get(), source_state == SourceState::FINISHED); + } + + bool eos = false; + RETURN_IF_ERROR(node->pull(state, block, &eos)); + if (eos) { + source_state = SourceState::FINISHED; + _child_block->clear_column_data(); + } else if (!node->need_more_input_data()) { + source_state = SourceState::MORE_DATA; + } else { + _child_block->clear_column_data(); + } + return Status::OK(); + } + +protected: + std::unique_ptr _child_block; + SourceState _child_source_state; +}; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 467057e034..0af15b2a31 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -19,58 +19,8 @@ #include "vec/exec/vrepeat_node.h" -namespace doris { -namespace pipeline { +namespace doris::pipeline { -RepeatOperator::RepeatOperator(RepeatOperatorBuilder* operator_builder, - vectorized::VRepeatNode* repeat_node) - : Operator(operator_builder), _repeat_node(repeat_node) {} +OPERATOR_CODE_GENERATOR(RepeatOperator, DataStateOperator) -Status RepeatOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(Operator::open(state)); - _child_block.reset(new vectorized::Block); - return _repeat_node->alloc_resource(state); -} - -Status RepeatOperator::close(RuntimeState* state) { - _fresh_exec_timer(_repeat_node); - _repeat_node->release_resource(state); - Operator::close(state); - return Status::OK(); -} - -Status RepeatOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (_repeat_node->need_more_input_data()) { - RETURN_IF_ERROR(_child->get_block(state, _child_block.get(), _child_source_state)); - source_state = _child_source_state; - if (_child_block->rows() == 0) { - return Status::OK(); - } - - _repeat_node->push(state, _child_block.get(), source_state == SourceState::FINISHED); - } - - bool eos = false; - RETURN_IF_ERROR(_repeat_node->pull(state, block, &eos)); - if (eos) { - source_state = SourceState::FINISHED; - _child_block->clear_column_data(); - } else if (!_repeat_node->need_more_input_data()) { - source_state = SourceState::MORE_DATA; - } else { - _child_block->clear_column_data(); - } - return Status::OK(); -} - -RepeatOperatorBuilder::RepeatOperatorBuilder(int32_t id, vectorized::VRepeatNode* repeat_node) - : OperatorBuilder(id, "RepeatOperatorBuilder", repeat_node), _repeat_node(repeat_node) {} - -OperatorPtr RepeatOperatorBuilder::build_operator() { - return std::make_shared(this, _repeat_node); -} -} // namespace pipeline -} // namespace doris +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index e2aaaeab02..5254a47d2a 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -26,32 +26,17 @@ class VExprContext; class Block; } // namespace vectorized namespace pipeline { -class RepeatOperatorBuilder; -class RepeatOperator : public Operator { + +class RepeatOperatorBuilder final : public OperatorBuilder { public: - RepeatOperator(RepeatOperatorBuilder* operator_builder, vectorized::VRepeatNode* repeat_node); - - Status open(RuntimeState* state) override; - - Status close(RuntimeState* state) override; - - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; - -private: - vectorized::VRepeatNode* _repeat_node; - std::unique_ptr _child_block; - SourceState _child_source_state; -}; - -class RepeatOperatorBuilder : public OperatorBuilder { -public: - RepeatOperatorBuilder(int32_t id, vectorized::VRepeatNode* repeat_node); + RepeatOperatorBuilder(int32_t id, ExecNode* repeat_node); OperatorPtr build_operator() override; +}; -private: - vectorized::VRepeatNode* _repeat_node; +class RepeatOperator final : public DataStateOperator { +public: + RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node); }; } // namespace pipeline diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index fb2a14355e..971c459671 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -21,49 +21,18 @@ #include "vec/sink/vresult_sink.h" namespace doris::pipeline { -ResultSinkOperator::ResultSinkOperator(OperatorBuilder* operator_builder, - vectorized::VResultSink* sink) - : Operator(operator_builder), _sink(sink) {} -Status ResultSinkOperator::init(const TDataSink& tsink) { - return Status::OK(); +ResultSinkOperatorBuilder::ResultSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "ResultSinkOperator", sink) {}; + +OperatorPtr ResultSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); } -Status ResultSinkOperator::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Operator::prepare(state)); - return _sink->prepare(state); -} - -Status ResultSinkOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - return _sink->open(state); -} +ResultSinkOperator::ResultSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {}; bool ResultSinkOperator::can_write() { return _sink->_sender->can_sink(); } - -Status ResultSinkOperator::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (!block) { - DCHECK(source_state == SourceState::FINISHED) - << "block is null, eos should invoke in finalize."; - return Status::OK(); - } - return _sink->send(state, block); -} - -Status ResultSinkOperator::finalize(RuntimeState* state) { - _finalized = true; - return _sink->close(state, Status::OK()); -} - -// TODO: Support fresh exec time for sink -Status ResultSinkOperator::close(RuntimeState* state) { - if (!_finalized) { - RETURN_IF_ERROR(_sink->close(state, Status::InternalError("Not finalized"))); - } - return Status::OK(); -} } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 190cdfb570..483cd98261 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -26,43 +26,18 @@ class VResultSink; namespace pipeline { -class ResultSinkOperator : public Operator { +class ResultSinkOperatorBuilder final : public DataSinkOperatorBuilder { public: - ResultSinkOperator(OperatorBuilder* operator_builder, vectorized::VResultSink* sink); + ResultSinkOperatorBuilder(int32_t id, DataSink* sink); - Status init(const TDataSink& tsink) override; - - Status prepare(RuntimeState* state) override; - - Status open(RuntimeState* state) override; - - bool can_write() override; - - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; - - Status finalize(RuntimeState* state) override; - - Status close(RuntimeState* state) override; - -private: - vectorized::VResultSink* _sink; - bool _finalized = false; + OperatorPtr build_operator() override; }; -class ResultSinkOperatorBuilder : public OperatorBuilder { +class ResultSinkOperator final : public DataSinkOperator { public: - ResultSinkOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node, - vectorized::VResultSink* sink) - : OperatorBuilder(id, name, exec_node), _sink(sink) {} + ResultSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink); - bool is_sink() const override { return true; } - - OperatorPtr build_operator() override { - return std::make_shared(this, _sink); - } - -private: - vectorized::VResultSink* _sink; + bool can_write() override; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4a06a80c95..bbf63954b7 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -22,47 +22,35 @@ namespace doris::pipeline { -ScanOperator::ScanOperator(OperatorBuilder* operator_builder, vectorized::VScanNode* scan_node) - : Operator(operator_builder), _scan_node(scan_node) {} +OPERATOR_CODE_GENERATOR(ScanOperator, Operator) Status ScanOperator::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(Operator::open(state)); - return _scan_node->open(state); + return _node->open(state); } bool ScanOperator::can_read() { - if (_scan_node->_eos || !_scan_node->_scanner_ctx || _scan_node->_scanner_ctx->done() || - _scan_node->_scanner_ctx->can_finish()) { + if (_node->_eos || !_node->_scanner_ctx || _node->_scanner_ctx->done() || + _node->_scanner_ctx->can_finish()) { // _eos: need eos // !_scanner_ctx: need call open // _scanner_ctx->done(): need finish // _scanner_ctx->can_finish(): should be scheduled return true; } else { - return !_scan_node->_scanner_ctx->empty_in_queue(); // have block to process + return !_node->_scanner_ctx->empty_in_queue(); // have block to process } } -Status ScanOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& result_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - bool eos = false; - RETURN_IF_ERROR(_scan_node->get_next(state, block, &eos)); - result_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; - return Status::OK(); -} - bool ScanOperator::is_pending_finish() const { - return _scan_node->_scanner_ctx && !_scan_node->_scanner_ctx->can_finish(); + return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish(); } Status ScanOperator::close(RuntimeState* state) { - if (!is_closed()) { - RETURN_IF_ERROR(_scan_node->close(state)); - } - _fresh_exec_timer(_scan_node); - return Operator::close(state); + RETURN_IF_ERROR(Operator::close(state)); + _node->close(state); + return Status::OK(); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 1f32f1c899..550a7540ed 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -29,31 +29,24 @@ class ScannerContext; namespace doris::pipeline { -class ScanOperator : public Operator { +class ScanOperatorBuilder : public OperatorBuilder { public: - ScanOperator(OperatorBuilder* operator_builder, vectorized::VScanNode* scan_node); + ScanOperatorBuilder(int32_t id, ExecNode* exec_node); + bool is_source() const override { return true; } + OperatorPtr build_operator() override; +}; + +class ScanOperator : public Operator { +public: + ScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node); bool can_read() override; // for source - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& result_state) override; - bool is_pending_finish() const override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - -private: - vectorized::VScanNode* _scan_node; -}; - -class ScanOperatorBuilder : public OperatorBuilder { -public: - ScanOperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node) - : OperatorBuilder(id, name, exec_node) {} - - bool is_source() const override { return true; } }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 273f20198c..6f65718b3a 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -21,31 +21,6 @@ namespace doris::pipeline { -SortSinkOperatorBuilder::SortSinkOperatorBuilder(int32_t id, const string& name, - vectorized::VSortNode* sort_node) - : OperatorBuilder(id, name, sort_node), _sort_node(sort_node) {} +OPERATOR_CODE_GENERATOR(SortSinkOperator, Operator) -SortSinkOperator::SortSinkOperator(SortSinkOperatorBuilder* operator_builder, - vectorized::VSortNode* sort_node) - : Operator(operator_builder), _sort_node(sort_node) {} - -Status SortSinkOperator::open(doris::RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(Operator::open(state)); - RETURN_IF_ERROR(_sort_node->alloc_resource(state)); - return Status::OK(); -} - -Status SortSinkOperator::close(doris::RuntimeState* /*state*/) { - _fresh_exec_timer(_sort_node); - return Status::OK(); -} - -Status SortSinkOperator::sink(doris::RuntimeState* state, vectorized::Block* block, - SourceState source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - // TODO pipeline when sort node's _reuse_mem is false, we should pass a new block to it. - RETURN_IF_ERROR(_sort_node->sink(state, block, source_state == SourceState::FINISHED)); - return Status::OK(); -} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 96adbfa31e..97ce409328 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -29,41 +29,20 @@ class VSortNode; namespace pipeline { -class SortSinkOperatorBuilder; - -class SortSinkOperator : public Operator { +class SortSinkOperatorBuilder final : public OperatorBuilder { public: - SortSinkOperator(SortSinkOperatorBuilder* operator_builder, vectorized::VSortNode* sort_node); - - Status open(RuntimeState* state) override; - - Status close(RuntimeState* state) override; - - // return can write continue - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; - - Status finalize(RuntimeState* /*state*/) override { return Status::OK(); } - - bool can_write() override { return true; }; - -private: - vectorized::VSortNode* _sort_node; -}; - -class SortSinkOperatorBuilder : public OperatorBuilder { -public: - SortSinkOperatorBuilder(int32_t id, const std::string& name, vectorized::VSortNode* sort_node); + SortSinkOperatorBuilder(int32_t id, ExecNode* sort_node); bool is_sink() const override { return true; } - bool is_source() const override { return false; } + OperatorPtr build_operator() override; +}; - OperatorPtr build_operator() override { - return std::make_shared(this, _sort_node); - } +class SortSinkOperator final : public Operator { +public: + SortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node); -private: - vectorized::VSortNode* _sort_node; + bool can_write() override { return true; }; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 4f25b216c3..e0be159263 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -21,34 +21,6 @@ namespace doris::pipeline { -SortSourceOperatorBuilder::SortSourceOperatorBuilder(int32_t id, const string& name, - vectorized::VSortNode* sort_node) - : OperatorBuilder(id, name, sort_node), _sort_node(sort_node) {} - -SortSourceOperator::SortSourceOperator(SortSourceOperatorBuilder* operator_builder, - vectorized::VSortNode* sort_node) - : Operator(operator_builder), _sort_node(sort_node) {} - -Status SortSourceOperator::close(doris::RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - _fresh_exec_timer(_sort_node); - _sort_node->release_resource(state); - return Operator::close(state); -} - -bool SortSourceOperator::can_read() { - return _sort_node->can_read(); -} - -Status SortSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - bool eos = false; - RETURN_IF_ERROR(_sort_node->pull(state, block, &eos)); - source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; - return Status::OK(); -} +OPERATOR_CODE_GENERATOR(SortSourceOperator, Operator) } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 9b83ce06fb..e8e1afe6f1 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -29,39 +29,18 @@ class VSortNode; namespace pipeline { -class SortSourceOperatorBuilder; - -class SortSourceOperator : public Operator { +class SortSourceOperatorBuilder final : public OperatorBuilder { public: - SortSourceOperator(SortSourceOperatorBuilder* operator_builder, - vectorized::VSortNode* sort_node); - - Status close(RuntimeState* state) override; - - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; - - bool can_read() override; - -private: - vectorized::VSortNode* _sort_node; -}; - -class SortSourceOperatorBuilder : public OperatorBuilder { -public: - SortSourceOperatorBuilder(int32_t id, const std::string& name, - vectorized::VSortNode* sort_node); - - bool is_sink() const override { return false; } + SortSourceOperatorBuilder(int32_t id, ExecNode* sort_node); bool is_source() const override { return true; } - OperatorPtr build_operator() override { - return std::make_shared(this, _sort_node); - } + OperatorPtr build_operator() override; +}; -private: - vectorized::VSortNode* _sort_node; +class SortSourceOperator final : public Operator { +public: + SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node); }; } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 9c1a2a57e8..70b9fd8e8b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -21,10 +21,10 @@ namespace doris::pipeline { -StreamingAggSinkOperator::StreamingAggSinkOperator( - StreamingAggSinkOperatorBuilder* operator_builder, vectorized::AggregationNode* agg_node, - std::shared_ptr agg_context) - : Operator(operator_builder), _agg_node(agg_node), _agg_context(std::move(agg_context)) {} +StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, + ExecNode* agg_node, + std::shared_ptr agg_context) + : Operator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {} Status StreamingAggSinkOperator::prepare(RuntimeState* state) { RETURN_IF_ERROR(Operator::prepare(state)); @@ -34,13 +34,6 @@ Status StreamingAggSinkOperator::prepare(RuntimeState* state) { return Status::OK(); } -Status StreamingAggSinkOperator::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(Operator::open(state)); - RETURN_IF_ERROR(_agg_node->alloc_resource(state)); - return Status::OK(); -} - bool StreamingAggSinkOperator::can_write() { // sink and source in diff threads return _agg_context->has_enough_space_to_push(); @@ -52,7 +45,7 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in Status ret = Status::OK(); if (in_block && in_block->rows() > 0) { auto bock_from_ctx = _agg_context->get_free_block(); - RETURN_IF_ERROR(_agg_node->do_pre_agg(in_block, bock_from_ctx.get())); + RETURN_IF_ERROR(_node->do_pre_agg(in_block, bock_from_ctx.get())); if (bock_from_ctx->rows() == 0) { _agg_context->return_free_block(std::move(bock_from_ctx)); } else { @@ -67,7 +60,7 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in } Status StreamingAggSinkOperator::close(RuntimeState* state) { - _fresh_exec_timer(_agg_node); + Operator::close(state); if (_agg_context && !_agg_context->is_finish()) { // finish should be set, if not set here means error. _agg_context->set_canceled(); @@ -77,25 +70,13 @@ Status StreamingAggSinkOperator::close(RuntimeState* state) { return Status::OK(); } -/////////////////////////////// operator template //////////////////////////////// - StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder( - int32_t id, const std::string& name, vectorized::AggregationNode* exec_node, - std::shared_ptr agg_context) - : OperatorBuilder(id, name, exec_node), - _agg_node(exec_node), + int32_t id, ExecNode* exec_node, std::shared_ptr agg_context) + : OperatorBuilder(id, "StreamingAggSinkOperator", exec_node), _agg_context(std::move(agg_context)) {} OperatorPtr StreamingAggSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _agg_node, _agg_context); + return std::make_shared(this, _node, _agg_context); } -// use final aggregation source operator -bool StreamingAggSinkOperatorBuilder::is_sink() const { - return true; -} - -bool StreamingAggSinkOperatorBuilder::is_source() const { - return false; -} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 691521ee9b..fe3edc7b52 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -28,26 +28,34 @@ class Block; } // namespace vectorized namespace pipeline { -class StreamingAggSinkOperatorBuilder; -class StreamingAggSinkOperator : public Operator { + +class StreamingAggSinkOperatorBuilder final : public OperatorBuilder { public: - StreamingAggSinkOperator(StreamingAggSinkOperatorBuilder* operator_builder, - vectorized::AggregationNode*, std::shared_ptr); + StreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); + + OperatorPtr build_operator() override; + + bool is_sink() const override { return true; }; + bool is_source() const override { return false; }; + +private: + std::shared_ptr _agg_context; +}; + +class StreamingAggSinkOperator final : public Operator { +public: + StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*, + std::shared_ptr); Status prepare(RuntimeState*) override; - Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; bool can_write() override; Status close(RuntimeState* state) override; - Status finalize(doris::RuntimeState* state) override { return Status::OK(); } - private: - vectorized::AggregationNode* _agg_node; vectorized::Block _preagg_block = vectorized::Block(); RuntimeProfile::Counter* _queue_byte_size_counter; @@ -56,20 +64,5 @@ private: std::shared_ptr _agg_context; }; -class StreamingAggSinkOperatorBuilder : public OperatorBuilder { -public: - StreamingAggSinkOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*, - std::shared_ptr); - - OperatorPtr build_operator() override; - - bool is_sink() const override; - bool is_source() const override; - -private: - vectorized::AggregationNode* _agg_node; - std::shared_ptr _agg_context; -}; - } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index 3270b553fd..e246876a0c 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -21,15 +21,9 @@ namespace doris { namespace pipeline { -StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilder* templ, - vectorized::AggregationNode* node, +StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node, std::shared_ptr agg_context) - : Operator(templ), _agg_node(node), _agg_context(std::move(agg_context)) {} - -Status StreamingAggSourceOperator::prepare(RuntimeState* state) { - _agg_node->increase_ref(); - return Status::OK(); -} + : Operator(templ, node), _agg_context(std::move(agg_context)) {} bool StreamingAggSourceOperator::can_read() { return _agg_context->has_data_or_finished(); @@ -44,14 +38,14 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl RETURN_IF_ERROR(_agg_context->get_block(&agg_block)); if (_agg_context->data_exhausted()) { - RETURN_IF_ERROR(_agg_node->pull(state, block, &eos)); + RETURN_IF_ERROR(_node->pull(state, block, &eos)); } else { block->swap(*agg_block); - agg_block->clear_column_data(_agg_node->row_desc().num_materialized_slots()); + agg_block->clear_column_data(_node->row_desc().num_materialized_slots()); _agg_context->return_free_block(std::move(agg_block)); } } else { - RETURN_IF_ERROR(_agg_node->pull(state, block, &eos)); + RETURN_IF_ERROR(_node->pull(state, block, &eos)); } source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; @@ -59,27 +53,13 @@ Status StreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Bl return Status::OK(); } -Status StreamingAggSourceOperator::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - _fresh_exec_timer(_agg_node); - if (!_agg_node->decrease_ref()) { - _agg_node->release_resource(state); - } - return Operator::close(state); -} - -/////////////////////////////// operator template //////////////////////////////// - StreamingAggSourceOperatorBuilder::StreamingAggSourceOperatorBuilder( - int32_t id, const std::string& name, vectorized::AggregationNode* exec_node, - std::shared_ptr agg_context) - : OperatorBuilder(id, name, exec_node), _agg_context(std::move(agg_context)) {} + int32_t id, ExecNode* exec_node, std::shared_ptr agg_context) + : OperatorBuilder(id, "StreamingAggSourceOperator", exec_node), + _agg_context(std::move(agg_context)) {} OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() { - return std::make_shared( - this, assert_cast(_related_exec_node), _agg_context); + return std::make_shared(this, _node, _agg_context); } } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h index 106f1cf99d..2685322e2b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h @@ -25,24 +25,10 @@ class AggregationNode; } namespace pipeline { -class StreamingAggSourceOperator : public Operator { +class StreamingAggSourceOperatorBuilder final + : public OperatorBuilder { public: - StreamingAggSourceOperator(OperatorBuilder*, vectorized::AggregationNode*, - std::shared_ptr); - Status prepare(RuntimeState* state) override; - bool can_read() override; - Status close(RuntimeState* state) override; - Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override; - -private: - vectorized::AggregationNode* _agg_node; - std::shared_ptr _agg_context; -}; - -class StreamingAggSourceOperatorBuilder : public OperatorBuilder { -public: - StreamingAggSourceOperatorBuilder(int32_t, const std::string&, vectorized::AggregationNode*, - std::shared_ptr); + StreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr); bool is_source() const override { return true; } @@ -52,5 +38,15 @@ private: std::shared_ptr _agg_context; }; +class StreamingAggSourceOperator final : public Operator { +public: + StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr); + bool can_read() override; + Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override; + +private: + std::shared_ptr _agg_context; +}; + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp similarity index 57% rename from be/src/pipeline/exec/olap_scan_operator.cpp rename to be/src/pipeline/exec/table_function_operator.cpp index 2e4fdc2570..be8dd0f131 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -15,19 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "olap_scan_operator.h" - -#include "vec/exec/scan/new_olap_scan_node.h" +#include "table_function_operator.h" namespace doris::pipeline { -OlapScanOperator::OlapScanOperator(OperatorBuilder* operator_builder, - vectorized::NewOlapScanNode* scan_node) - : ScanOperator(operator_builder, scan_node) {} +OPERATOR_CODE_GENERATOR(TableFunctionOperator, DataStateOperator) -OlapScanOperatorBuilder::OlapScanOperatorBuilder(uint32_t id, const std::string& name, - vectorized::NewOlapScanNode* new_olap_scan_node) - : ScanOperatorBuilder(id, name, new_olap_scan_node), - _new_olap_scan_node(new_olap_scan_node) {} - -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 0735b981cc..f106abab07 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -20,73 +20,17 @@ #include "operator.h" #include "vec/exec/vtable_function_node.h" -namespace doris { +namespace doris::pipeline { -namespace pipeline { -class TableFunctionOperator; - -class TableFunctionOperatorBuilder : public OperatorBuilder { +class TableFunctionOperatorBuilder final : public OperatorBuilder { public: - TableFunctionOperatorBuilder(int32_t id, vectorized::VTableFunctionNode* node) - : OperatorBuilder(id, "TableFunctionOperatorBuilder", node), _node(node) {} + TableFunctionOperatorBuilder(int32_t id, ExecNode* node); OperatorPtr build_operator() override; - -private: - vectorized::VTableFunctionNode* _node; }; -class TableFunctionOperator : public Operator { +class TableFunctionOperator final : public DataStateOperator { public: - TableFunctionOperator(TableFunctionOperatorBuilder* operator_builder, - vectorized::VTableFunctionNode* node) - : Operator(operator_builder), _node(node) {} - - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Operator::open(state)); - _child_block.reset(new vectorized::Block); - return _node->alloc_resource(state); - } - - Status close(RuntimeState* state) override { - _node->release_resource(state); - _fresh_exec_timer(_node); - return Operator::close(state); - } - - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override { - if (_node->need_more_input_data()) { - RETURN_IF_ERROR(_child->get_block(state, _child_block.get(), _child_source_state)); - source_state = _child_source_state; - if (_child_block->rows() == 0) { - return Status::OK(); - } - _node->push(state, _child_block.get(), source_state == SourceState::FINISHED); - } - - bool eos = false; - RETURN_IF_ERROR(_node->pull(state, block, &eos)); - if (eos) { - source_state = SourceState::FINISHED; - _child_block->clear_column_data(); - } else if (!_node->need_more_input_data()) { - source_state = SourceState::MORE_DATA; - } else { - _child_block->clear_column_data(); - } - return Status::OK(); - } - -private: - vectorized::VTableFunctionNode* _node; - std::unique_ptr _child_block; - SourceState _child_source_state; + TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* node); }; - -OperatorPtr TableFunctionOperatorBuilder::build_operator() { - return std::make_shared(this, _node); -} - -} // namespace pipeline -} // namespace doris +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 7342d49850..7c956a5e9d 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -37,7 +37,6 @@ Status Pipeline::build_operators(Operators& operators) { OperatorPtr pre; for (auto& operator_t : _operator_builders) { auto o = operator_t->build_operator(); - RETURN_IF_ERROR(o->init(operator_t->exec_node(), _context->get_runtime_state())); if (pre) { o->set_child(pre); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 865b7b2a04..0addd4464b 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -66,7 +66,7 @@ public: Status set_sink(OperatorBuilderPtr& sink_operator); - OperatorBuilder* sink() { return _sink.get(); } + OperatorBuilderBase* sink() { return _sink.get(); } Status build_operators(Operators&); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d1913a1534..0c2793c444 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -26,10 +26,10 @@ #include "exec/empty_set_operator.h" #include "exec/exchange_sink_operator.h" #include "exec/exchange_source_operator.h" -#include "exec/olap_scan_operator.h" #include "exec/repeat_operator.h" #include "exec/result_sink_operator.h" #include "exec/scan_node.h" +#include "exec/scan_operator.h" #include "exec/sort_sink_operator.h" #include "exec/sort_source_operator.h" #include "exec/streaming_aggregation_sink_operator.h" @@ -257,7 +257,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks( for (PipelinePtr& pipeline : _pipelines) { // if sink auto sink = pipeline->sink()->build_operator(); - RETURN_IF_ERROR(sink->init(pipeline->sink()->exec_node(), _runtime_state.get())); // TODO pipeline 1 need to add new interface for exec node and operator sink->init(request.fragment.output_sink); @@ -282,23 +281,20 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur switch (node_type) { // for source case TPlanNodeType::OLAP_SCAN_NODE: { - auto* new_olap_scan_node = assert_cast(node); - OperatorBuilderPtr operator_t = std::make_shared( - fragment_context->next_operator_builder_id(), "OlapScanOperator", - new_olap_scan_node); + OperatorBuilderPtr operator_t = std::make_shared( + fragment_context->next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::EXCHANGE_NODE: { - OperatorBuilderPtr operator_t = std::make_shared( - next_operator_builder_id(), "ExchangeSourceOperator", node); + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::EMPTY_SET_NODE: { - auto* empty_set_node = assert_cast(node); - OperatorBuilderPtr operator_t = std::make_shared( - next_operator_builder_id(), "EmptySetSourceOperator", empty_set_node); + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } @@ -309,50 +305,47 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur if (agg_node->is_streaming_preagg()) { auto agg_ctx = std::make_shared(); OperatorBuilderPtr pre_agg_sink = std::make_shared( - next_operator_builder_id(), "StreamingAggSinkOperator", agg_node, agg_ctx); + next_operator_builder_id(), agg_node, agg_ctx); RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); OperatorBuilderPtr pre_agg_source = std::make_shared( - next_operator_builder_id(), "StreamingAggSourceOperator", agg_node, agg_ctx); + next_operator_builder_id(), agg_node, agg_ctx); RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); } else { - OperatorBuilderPtr agg_sink = std::make_shared( - next_operator_builder_id(), "AggSinkOperator", agg_node); + OperatorBuilderPtr agg_sink = + std::make_shared(next_operator_builder_id(), agg_node); RETURN_IF_ERROR(new_pipe->set_sink(agg_sink)); - OperatorBuilderPtr agg_source = std::make_shared( - next_operator_builder_id(), "AggSourceOperator", agg_node); + OperatorBuilderPtr agg_source = std::make_shared( + next_operator_builder_id(), agg_node); RETURN_IF_ERROR(cur_pipe->add_operator(agg_source)); } break; } case TPlanNodeType::SORT_NODE: { - auto* sort_node = assert_cast(node); auto new_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); - OperatorBuilderPtr sort_sink = std::make_shared( - next_operator_builder_id(), "SortSinkOperator", sort_node); + OperatorBuilderPtr sort_sink = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink)); - OperatorBuilderPtr sort_source = std::make_shared( - next_operator_builder_id(), "SortSourceOperator", sort_node); + OperatorBuilderPtr sort_source = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(sort_source)); break; } case TPlanNodeType::REPEAT_NODE: { - auto* repeat_node = assert_cast(node); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr builder = - std::make_shared(next_operator_builder_id(), repeat_node); + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } case TPlanNodeType::TABLE_FUNCTION_NODE: { - auto* repeat_node = assert_cast(node); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); - OperatorBuilderPtr builder = std::make_shared( - next_operator_builder_id(), repeat_node); + OperatorBuilderPtr builder = + std::make_shared(next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } @@ -380,15 +373,13 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { OperatorBuilderPtr sink_; switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: { - auto* exchange_sink = assert_cast(_sink.get()); - sink_ = std::make_shared( - next_operator_builder_id(), "ExchangeSinkOperator", nullptr, exchange_sink, this); + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get(), this); break; } case TDataSinkType::RESULT_SINK: { - auto* result_sink = assert_cast(_sink.get()); - sink_ = std::make_shared( - next_operator_builder_id(), "ResultSinkOperator", nullptr, result_sink); + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get()); break; } default: @@ -566,4 +557,4 @@ void PipelineFragmentContext::send_report(bool done) { } } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 77e2ee8952..0f49cc0029 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -203,7 +203,6 @@ private: std::atomic _next_core = 0; }; -// TODO pipeline sr class BlockedTaskScheduler { public: explicit BlockedTaskScheduler(std::shared_ptr task_queue) diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index b70cb5b88f..6b8e2436aa 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -20,7 +20,6 @@ #include "common/status.h" #include "olap/storage_engine.h" #include "olap/tablet.h" -#include "pipeline/exec/olap_scan_operator.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_fragment_context.h" #include "util/to_string.h" diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index c641d45d01..eadcf3371e 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -49,6 +49,8 @@ class VScanNode : public ExecNode { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} + virtual ~VScanNode() = default; + friend class VScanner; friend class NewOlapScanner; friend class VFileScanner; diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index b6b2660a00..ac34ca84f6 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -36,7 +36,7 @@ class MemPool; namespace pipeline { class AggSinkOperator; -class AggregationSourceOperator; +class AggSourceOperator; class StreamingAggSinkOperator; class StreamingAggSourceOperator; } // namespace pipeline @@ -789,7 +789,7 @@ public: private: friend class pipeline::AggSinkOperator; friend class pipeline::StreamingAggSinkOperator; - friend class pipeline::AggregationSourceOperator; + friend class pipeline::AggSourceOperator; friend class pipeline::StreamingAggSourceOperator; // group by k1,k2 std::vector _probe_expr_ctxs; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 9a36731b8d..7b0239764a 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -88,6 +88,8 @@ public: bool channel_all_can_write(); + const RowDescriptor& row_desc() { return _row_desc; } + protected: friend class Channel; friend class pipeline::ExchangeSinkBuffer; diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h index b71cf122fe..1e71286118 100644 --- a/be/src/vec/sink/vresult_sink.h +++ b/be/src/vec/sink/vresult_sink.h @@ -56,6 +56,8 @@ public: void set_query_statistics(std::shared_ptr statistics) override; + const RowDescriptor& row_desc() { return _row_desc; } + private: Status prepare_exprs(RuntimeState* state); TResultSinkType::type _sink_type;