From b311ebef6cae6496ed1a6053b8f22e9845476cd6 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 9 Dec 2022 09:08:28 +0800 Subject: [PATCH] [pipeline](refactor) do some refactor for code and comments (#14934) --- .../pipeline/exec/hashjoin_probe_operator.cpp | 2 +- .../pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 107 +++++++++++------- be/src/pipeline/exec/repeat_operator.cpp | 2 +- be/src/pipeline/exec/repeat_operator.h | 2 +- .../pipeline/exec/table_function_operator.cpp | 2 +- .../pipeline/exec/table_function_operator.h | 2 +- be/src/pipeline/task_scheduler.cpp | 2 +- 8 files changed, 76 insertions(+), 45 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index beddf6d655..91cef6d915 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -22,7 +22,7 @@ namespace doris { namespace pipeline { -OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, DataStateOperator) +OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, StatefulOperator) } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index ed2abd0663..7ea3e47546 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -32,7 +32,7 @@ public: OperatorPtr build_operator() override; }; -class HashJoinProbeOperator final : public DataStateOperator { +class HashJoinProbeOperator final : public StatefulOperator { public: HashJoinProbeOperator(OperatorBuilderBase*, ExecNode*); // if exec node split to: sink, source operator. the source operator diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 9f98be1a7a..f1e29f0547 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -36,19 +36,33 @@ namespace doris::pipeline { -// Result of source pull data, init state is DEPEND_ON_SOURCE +/** + * State of source operator. + * |------> MORE_DATA ------| + * | ^ | | + * DEPEND_ON_SOURCE ----| |----| |----> FINISHED + * ^ | |------------------------| + * |-------| + */ enum class SourceState : uint8_t { - DEPEND_ON_SOURCE = 0, // Operator has no more data in itself, needs to read from source. - MORE_DATA = 1, // Still have data can read + DEPEND_ON_SOURCE = 0, // Need more data from source. + MORE_DATA = 1, // Has more data to output. (e.g. RepeatNode) FINISHED = 2 }; +/** + * State of sink operator. + * |------> SINK_BUSY ------| + * | ^ | | + * SINK_IDLE --------| |----| |----> FINISHED + * ^ | |------------------------| + * |-------| + */ enum class SinkState : uint8_t { - SINK_IDLE = 0, // can send block to sink - SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block + SINK_IDLE = 0, // Can send block to sink. + SINK_BUSY = 1, // Sink buffer is full, sink operator is blocked until buffer is freed. FINISHED = 2 }; -//////////////// DO NOT USE THE UP State //////////////// class OperatorBuilderBase; class OperatorBase; @@ -70,10 +84,8 @@ public: virtual bool is_sink() const { return false; } virtual bool is_source() const { return false; } - // create the object used by all operator virtual Status prepare(RuntimeState* state); - // destory the object used by all operator virtual void close(RuntimeState* state); std::string get_name() const { return _name; } @@ -131,32 +143,30 @@ public: explicit OperatorBase(OperatorBuilderBase* operator_builder); virtual ~OperatorBase() = default; - // After both sink and source need to know the cancel state. - // do cancel work bool is_sink() const; bool is_source() const; - // Only result sink and data stream sink need to impl the virtual function virtual Status init(const TDataSink& tsink) { return Status::OK(); }; - // Do prepare some state of Operator + // Prepare for running. (e.g. resource allocation, etc.) virtual Status prepare(RuntimeState* state) = 0; - // Like ExecNode,when pipeline task first time be scheduled, can't block - // the pipeline should be open after dependencies is finish - // Eg a -> c, b-> c, after a, b pipeline finish, c pipeline should call open - // Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism, - // but if one pipeline have multi task to parallel work, need to rethink the logic - // - // Each operator should call alloc_resource() to prepare resource to do data compute. - // if ExecNode split to sink and source operator, alloc_resource() should be called in sink operator + /** + * Allocate resources needed by this operator. + * + * This is called when current pipeline is scheduled first time. + * e.g. If we got three pipeline and dependencies are A -> B, B-> C, all operators' `open` + * method in pipeline C will be called once pipeline A and B finished. + * + * Now we have only one task per pipeline, so it has no problem, + * But if one pipeline have multi task running in parallel, we need to rethink this logic. + */ virtual Status open(RuntimeState* state) = 0; - // Release the resource, should not block the thread - // - // Each operator should call close_self() to release resource - // if ExecNode split to sink and source operator, close_self() should be called in source operator + /** + * Release all resources once this operator done its work. + */ virtual Status close(RuntimeState* state) = 0; Status set_child(OperatorPtr child) { @@ -171,13 +181,19 @@ public: virtual bool can_write() { return false; } // for sink - // for pipeline + /** + * The main method to execute a pipeline task. + * Now it is a pull-based pipeline and operators pull data from its child by this method. + */ virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block, SourceState& result_state) { return Status::OK(); }; - // return can write continue + /** + * Push data to the sink operator. + * Data in this block will be sent by RPC or written to somewhere finally. + */ virtual Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) = 0; @@ -187,15 +203,16 @@ public: return Status::NotSupported(error_msg.str()); } - // close be called - // - Source: scan thread do not exist - // - Sink: RPC do not be disposed - // - else return false + /** + * pending_finish means we have called `close` and there are still some work to do before finishing. + * Now it is a pull-based pipeline and operators pull data from its child by this method. + * + * For source operator, it is pending_finish iff scan threads have not been released yet + * For sink operator, it is pending_finish iff RPC resources have not been released yet + * Otherwise, it will return false. + */ virtual bool is_pending_finish() const { return false; } - // TODO: should we keep the function - // virtual bool is_finished() = 0; - bool is_closed() const { return _is_closed; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } @@ -211,8 +228,6 @@ protected: std::unique_ptr _mem_tracker; OperatorBuilderBase* _operator_builder; - // source has no child - // if an operator is not source, it will get data from its child. OperatorPtr _child; std::unique_ptr _runtime_profile; @@ -223,6 +238,11 @@ private: bool _is_closed = false; }; +/** + * All operators inherited from DataSinkOperator will hold a SinkNode inside. Namely, it is a one-to-one relation between DataSinkOperator and DataSink. + * + * It should be mentioned that, not all SinkOperators are inherited from this (e.g. SortSinkOperator which holds a sort node inside instead of a DataSink). + */ template class DataSinkOperator : public OperatorBase { public: @@ -275,6 +295,9 @@ protected: NodeType* _sink; }; +/** + * All operators inherited from Operator will hold a ExecNode inside. + */ template class Operator : public OperatorBase { public: @@ -337,18 +360,26 @@ protected: NodeType* _node; }; +/** + * StatefulOperator indicates the operators with some states inside. + * + * Specifically, we called an operator stateful if an operator can determine its output by itself. + * For example, hash join probe operator is a typical StatefulOperator. When it gets a block from probe side, it will hold this block inside (e.g. _child_block). + * If there are still remain rows in probe block, we can get output block by calling `get_block` without any data from its child. + * In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator. + */ template -class DataStateOperator : public Operator { +class StatefulOperator : public Operator { public: using NodeType = std::remove_pointer_t().exec_node())>; - DataStateOperator(OperatorBuilderBase* builder, ExecNode* node) + StatefulOperator(OperatorBuilderBase* builder, ExecNode* node) : Operator(builder, node), _child_block(new vectorized::Block), _child_source_state(SourceState::DEPEND_ON_SOURCE) {}; - virtual ~DataStateOperator() = default; + virtual ~StatefulOperator() = default; Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 0af15b2a31..def1f6da9d 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -21,6 +21,6 @@ namespace doris::pipeline { -OPERATOR_CODE_GENERATOR(RepeatOperator, DataStateOperator) +OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator) } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 5254a47d2a..15707ea39c 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -34,7 +34,7 @@ public: OperatorPtr build_operator() override; }; -class RepeatOperator final : public DataStateOperator { +class RepeatOperator final : public StatefulOperator { public: RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node); }; diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index be8dd0f131..8146dc2ea0 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -19,6 +19,6 @@ namespace doris::pipeline { -OPERATOR_CODE_GENERATOR(TableFunctionOperator, DataStateOperator) +OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator) } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index f106abab07..f2c0437101 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -29,7 +29,7 @@ public: OperatorPtr build_operator() override; }; -class TableFunctionOperator final : public DataStateOperator { +class TableFunctionOperator final : public StatefulOperator { public: TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* node); }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 051775f094..5c146cd3f5 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -211,7 +211,7 @@ void TaskScheduler::_do_work(size_t index) { if (!task) { task = queue->steal_take(index); if (!task) { - // TODO: The take is a stock method, rethink the logic + // TODO: The take is a blocking method, rethink the logic task = queue->take(index); if (!task) { continue;