diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 49cbe30a3b..4d6f9636de 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -730,7 +730,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operato _pool(pool), _limit(tnode.limit), _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()), - _is_streaming(is_streaming) { + _is_streaming(is_streaming), + _partition_exprs(tnode.agg_node.grouping_exprs) { _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 7c5483e1e1..232a61d50f 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -366,6 +366,14 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } + ExchangeType get_local_exchange_type() const override { + if (_probe_expr_ctxs.empty()) { + return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP; + } + return ExchangeType::SHUFFLE; + } + using DataSinkOperatorX::id; using DataSinkOperatorX::operator_id; using DataSinkOperatorX::get_local_state; @@ -405,6 +413,8 @@ protected: int64_t _limit; // -1: no limit bool _have_conjuncts; const bool _is_streaming; + + const std::vector _partition_exprs; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index b30a829872..c04e35448d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -110,6 +110,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 20d612d578..8065ae3082 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -242,7 +242,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -std::string ExchangeSinkLocalState::id_name() { +std::string ExchangeSinkLocalState::name_suffix() { std::string name = " (id=" + std::to_string(_parent->node_id()); auto& p = _parent->cast(); name += ",dest_id=" + std::to_string(p._dest_node_id); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 048c8d3910..32a61eda33 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -184,7 +184,7 @@ public: [[nodiscard]] int sender_id() const { return _sender_id; } - std::string id_name() override; + std::string name_suffix() override; segment_v2::CompressionTypePB& compression_type(); std::string debug_string(int indentation_level) const override; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 1c766f06a8..f64ebcddca 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -110,6 +110,11 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo : OperatorX(pool, tnode, operator_id, descs), _num_senders(num_senders), _is_merging(tnode.exchange_node.__isset.sort_info), + _is_hash_partition( + tnode.exchange_node.__isset.partition_type && + (tnode.exchange_node.partition_type == TPartitionType::HASH_PARTITIONED || + tnode.exchange_node.partition_type == + TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)), _input_row_desc(descs, tnode.exchange_node.input_row_tuples, std::vector(tnode.nullable_tuples.begin(), tnode.nullable_tuples.begin() + diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 479a879905..633a9c3e29 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -104,15 +104,13 @@ public: return _sub_plan_query_statistics_recvr; } - bool need_to_local_shuffle() const override { - // TODO(gabriel): - return false; - } + bool need_to_local_shuffle() const override { return !_is_hash_partition; } private: friend class ExchangeLocalState; const int _num_senders; const bool _is_merging; + const bool _is_hash_partition; RowDescriptor _input_row_desc; std::shared_ptr _sub_plan_query_statistics_recvr; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 819575211c..f34346c906 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -53,6 +53,11 @@ Status FileScanLocalState::_init_scanners(std::list* s return Status::OK(); } +std::string FileScanLocalState::name_suffix() const { + return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()), + _parent->cast()._table_name); +} + void FileScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) { int max_scanners = diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 4648ed716a..6ae3344ed7 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -54,6 +54,7 @@ public: void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override; int parent_id() { return _parent->node_id(); } + std::string name_suffix() const override; private: std::vector _scan_ranges; @@ -70,7 +71,9 @@ class FileScanOperatorX final : public ScanOperatorX { public: FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, operator_id, descs) { + : ScanOperatorX(pool, tnode, operator_id, descs), + _table_name(tnode.file_scan_node.__isset.table_name ? tnode.file_scan_node.table_name + : "") { _output_tuple_id = tnode.file_scan_node.tuple_id; } @@ -78,6 +81,8 @@ public: private: friend class FileScanLocalState; + + const std::string _table_name; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 1e31014512..92c61882e2 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -414,6 +414,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st for (const auto& eq_join_conjunct : eq_join_conjuncts) { vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, ctx)); + _partition_exprs.push_back(eq_join_conjunct.right); _build_expr_ctxs.push_back(ctx); const auto vexpr = _build_expr_ctxs.back()->root(); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index b45c2eed75..cc34f46d4d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -155,6 +155,14 @@ public: ._should_build_hash_table; } + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } + ExchangeType get_local_exchange_type() const override { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _is_broadcast_join) { + return ExchangeType::NOOP; + } + return ExchangeType::SHUFFLE; + } + private: friend class HashJoinBuildSinkLocalState; @@ -171,6 +179,7 @@ private: vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; std::vector _runtime_filter_descs; + std::vector _partition_exprs; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 174d102993..74304bf535 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -222,6 +222,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() { HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) : JoinProbeOperatorX(pool, tnode, operator_id, descs), + _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && + tnode.hash_join_node.is_broadcast_join), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) {} @@ -549,6 +551,7 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) for (const auto& eq_join_conjunct : eq_join_conjuncts) { vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx)); + _partition_exprs.push_back(eq_join_conjunct.left); _probe_expr_ctxs.push_back(ctx); bool null_aware = eq_join_conjunct.__isset.opcode && eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 4de50474bf..263b4d4252 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -163,6 +163,13 @@ public: SourceState& source_state) const override; bool need_more_input_data(RuntimeState* state) const override; + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } + ExchangeType get_local_exchange_type() const override { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + return ExchangeType::NOOP; + } + return _is_broadcast_join ? ExchangeType::PASSTHROUGH : ExchangeType::SHUFFLE; + } private: Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, @@ -170,6 +177,7 @@ private: std::vector& res_col_ids) const; friend class HashJoinProbeLocalState; + const bool _is_broadcast_join; // other expr vectorized::VExprContextSPtrs _other_join_conjuncts; // probe expr @@ -182,6 +190,7 @@ private: std::vector _left_output_slot_flags; std::vector _right_output_slot_flags; std::vector _right_table_column_names; + std::vector _partition_exprs; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp b/be/src/pipeline/exec/jdbc_scan_operator.cpp index 1f03939df1..74890f647f 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.cpp +++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp @@ -22,6 +22,11 @@ namespace doris::pipeline { +std::string JDBCScanLocalState::name_suffix() const { + return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()), + _parent->cast()._table_name); +} + Status JDBCScanLocalState::_init_scanners(std::list* scanners) { auto& p = _parent->cast(); std::unique_ptr scanner = vectorized::NewJdbcScanner::create_unique( diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h b/be/src/pipeline/exec/jdbc_scan_operator.h index bf954e25df..2acf5b5ec9 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.h +++ b/be/src/pipeline/exec/jdbc_scan_operator.h @@ -45,6 +45,8 @@ public: : ScanLocalState(state, parent) {} Status _init_scanners(std::list* scanners) override; + std::string name_suffix() const override; + private: friend class vectorized::NewJdbcScanner; }; diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 12d89c1049..6a947c5f6b 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -84,7 +84,7 @@ public: } Status set_child(OperatorXPtr child) override { - if (OperatorX::_child_x) { + if (OperatorX::_child_x && _build_side_child == nullptr) { // when there already (probe) child, others is build child. set_build_side_child(child); } else { @@ -113,7 +113,7 @@ protected: std::unique_ptr _intermediate_row_desc; // output expr vectorized::VExprContextSPtrs _output_expr_ctxs; - OperatorXPtr _build_side_child; + OperatorXPtr _build_side_child = nullptr; const bool _short_circuit_for_null_in_build_side; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index ad6b3c8cdd..8a45634027 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -25,7 +25,7 @@ OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { return std::make_shared(this, _sink); } -std::string MultiCastDataStreamSinkLocalState::id_name() { +std::string MultiCastDataStreamSinkLocalState::name_suffix() { auto& sinks = static_cast(_parent)->sink_node().sinks; std::string id_name = " (dst id : "; for (auto& sink : sinks) { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index a2ad07e529..a30361ebe1 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -60,7 +60,7 @@ class MultiCastDataStreamSinkLocalState final using Base = PipelineXSinkLocalState; using Parent = MultiCastDataStreamSinkOperatorX; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - std::string id_name() override; + std::string name_suffix() override; private: std::shared_ptr _multi_cast_data_streamer; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 2f22daa519..3f0cc36942 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -298,7 +298,7 @@ Status OlapScanLocalState::_init_scanners(std::list* s return Status::OK(); } -TOlapScanNode& OlapScanLocalState::olap_scan_node() { +TOlapScanNode& OlapScanLocalState::olap_scan_node() const { return _parent->cast()._olap_scan_node; } diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 1f0fac55a4..868d3efe55 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -45,7 +45,12 @@ public: OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalState(state, parent) {} - TOlapScanNode& olap_scan_node(); + TOlapScanNode& olap_scan_node() const; + + std::string name_suffix() const override { + return fmt::format(" (id={}. table name = {})", std::to_string(_parent->node_id()), + olap_scan_node().table_name); + } private: friend class vectorized::NewOlapScanner; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index a006cda943..2beea932a8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -269,7 +269,7 @@ protected: OperatorPtr _child; // Used on pipeline X - OperatorXPtr _child_x; + OperatorXPtr _child_x = nullptr; bool _is_closed; }; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 182946188e..b0327e71cd 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -120,6 +120,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; } }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index a0b6de5c62..68213fe9c2 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -136,6 +136,7 @@ public: std::vector>& children() { return _children; } void set_children(std::shared_ptr child) { _children.push_back(child); } + void set_children(std::vector> children) { _children = children; } private: void _init_profile(); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ce3d943af7..5812da5b7f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -194,6 +194,15 @@ PipelinePtr PipelineFragmentContext::add_pipeline() { return pipeline; } +PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent) { + // _prepared、_submitted, _canceled should do not add pipeline + PipelineId id = _next_pipeline_id++; + auto pipeline = std::make_shared(id, weak_from_this()); + _pipelines.emplace_back(pipeline); + parent->set_children(pipeline); + return pipeline; +} + Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, const size_t idx) { if (_prepared) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f38996a161..480e4332d4 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -71,6 +71,8 @@ public: PipelinePtr add_pipeline(); + PipelinePtr add_pipeline(PipelinePtr parent); + TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 49dc327c79..23ce13a1db 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -574,6 +574,25 @@ public: } }; +enum class ExchangeType : uint8_t { + NOOP = 0, + SHUFFLE = 1, + PASSTHROUGH = 2, +}; + +inline std::string get_exchange_type_name(ExchangeType idx) { + switch (idx) { + case ExchangeType::NOOP: + return "NOOP"; + case ExchangeType::SHUFFLE: + return "SHUFFLE"; + case ExchangeType::PASSTHROUGH: + return "PASSTHROUGH"; + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); +} + class Exchanger; struct LocalExchangeSharedState : public BasicSharedState { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index c5fe9dc7dc..f5f1d0f48a 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -89,10 +89,10 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(bool need_partitioner) override { - _name = "LOCAL_EXCHANGE_SINK_OPERATOR"; - _need_partitioner = need_partitioner; - if (_need_partitioner) { + Status init(ExchangeType type) override { + _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; + _type = type; + if (_type == ExchangeType::SHUFFLE) { _partitioner.reset( new vectorized::Crc32HashPartitioner(_num_partitions)); RETURN_IF_ERROR(_partitioner->init(_texprs)); @@ -102,7 +102,7 @@ public: } Status prepare(RuntimeState* state) override { - if (_need_partitioner) { + if (_type == ExchangeType::SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); } @@ -110,7 +110,7 @@ public: } Status open(RuntimeState* state) override { - if (_need_partitioner) { + if (_type == ExchangeType::SHUFFLE) { RETURN_IF_ERROR(_partitioner->open(state)); } @@ -122,7 +122,7 @@ public: private: friend class LocalExchangeSinkLocalState; - bool _need_partitioner; + ExchangeType _type; const int _num_partitions; const std::vector& _texprs; std::unique_ptr _partitioner; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index dd64852891..4467b32db4 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -36,9 +36,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - _dependency->set_shared_state(info.local_exchange_state); - _shared_state = (LocalExchangeSharedState*)_dependency->shared_state().get(); - DCHECK(_shared_state != nullptr); _channel_id = info.task_idx; _shared_state->set_dep_by_channel_id(_dependency, _channel_id); _exchanger = _shared_state->exchanger.get(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index d94b9041fc..9a44ce9678 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -60,10 +60,9 @@ private: class LocalExchangeSourceOperatorX final : public OperatorX { public: using Base = OperatorX; - LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase* parent) - : Base(pool, -1, id), _parent(parent) {} - Status init(const TPlanNode& tnode, RuntimeState* state) override { - _op_name = "LOCAL_EXCHANGE_OPERATOR"; + LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {} + Status init(ExchangeType type) override { + _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) + ")"; return Status::OK(); } Status prepare(RuntimeState* state) override { return Status::OK(); } @@ -79,21 +78,8 @@ public: bool is_source() const override { return true; } - Status set_child(OperatorXPtr child) override { - if (_child_x) { - // Set build side child for join probe operator - DCHECK(_parent != nullptr); - RETURN_IF_ERROR(_parent->set_child(child)); - } else { - _child_x = std::move(child); - } - return Status::OK(); - } - private: friend class LocalExchangeSourceLocalState; - - OperatorXBase* _parent = nullptr; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 616b469a99..13a3f23222 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -115,8 +115,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, LocalExchangeSinkLocalState& local_state) { - auto new_block = vectorized::Block::create_unique(in_block->clone_empty()); - new_block->swap(*in_block); + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_instances; _data_queue[channel_id].enqueue(std::move(new_block)); local_state._shared_state->set_ready_for_read(channel_id); @@ -127,16 +127,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, LocalExchangeSourceLocalState& local_state) { - std::unique_ptr next_block; + vectorized::Block next_block; if (running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - *block = *next_block.release(); + *block = std::move(next_block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); source_state = SourceState::FINISHED; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - *block = *next_block.release(); + *block = std::move(next_block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 13e3fe931e..b7acff688f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -22,11 +22,6 @@ namespace doris::pipeline { -enum class ExchangeType : uint8_t { - SHUFFLE = 0, - PASSTHROUGH = 1, -}; - class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; @@ -92,7 +87,7 @@ public: ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } private: - std::vector>> _data_queue; + std::vector> _data_queue; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 04f7cea031..76e4d42823 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -330,8 +330,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { - _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + - " (id=" + std::to_string(_parent->node_id()) + ")")); + _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix())); _runtime_profile->set_metadata(_parent->node_id()); _runtime_profile->set_is_sink(false); info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); @@ -341,7 +340,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState if constexpr (!std::is_same_v) { auto& deps = info.upstream_dependencies; if constexpr (std::is_same_v) { - _dependency->set_shared_state(info.local_exchange_state); + _dependency->set_shared_state(info.le_state_map[_parent->operator_id()]); } else { _dependency->set_shared_state(deps.front()->shared_state()); } @@ -402,7 +401,7 @@ template Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + id_name())); + _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix())); _profile->set_metadata(_parent->node_id()); _profile->set_is_sink(true); _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); @@ -410,7 +409,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, auto& deps = info.dependencys; _dependency = (DependencyType*)deps.front().get(); if constexpr (std::is_same_v) { - _dependency->set_shared_state(info.local_exchange_state); + _dependency->set_shared_state(info.le_state_map[_parent->dests_id().front()]); } if (_dependency) { _shared_state = diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 58c18db703..69d6a6cbc3 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -20,6 +20,7 @@ #include "common/logging.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" namespace doris::pipeline { @@ -28,7 +29,7 @@ struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; const std::vector scan_ranges; std::vector& upstream_dependencies; - std::shared_ptr local_exchange_state; + std::map> le_state_map; int task_idx; DependencySPtr dependency; @@ -39,7 +40,7 @@ struct LocalSinkStateInfo { RuntimeProfile* parent_profile = nullptr; const int sender_id; std::vector& dependencys; - std::shared_ptr local_exchange_state; + std::map> le_state_map; const TDataSink& tsink; }; @@ -160,6 +161,10 @@ public: LOG(FATAL) << "should not reach here!"; return Status::OK(); } + virtual Status init(ExchangeType type) { + LOG(FATAL) << "should not reach here!"; + return Status::OK(); + } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Runtime Profile is not owned by operator"); @@ -170,6 +175,8 @@ public: } [[nodiscard]] std::string get_name() const override { return _op_name; } virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; + virtual std::vector get_local_shuffle_exprs() const { return {}; } + virtual ExchangeType get_local_exchange_type() const { return ExchangeType::NOOP; } Status prepare(RuntimeState* state) override; @@ -313,6 +320,10 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; + virtual std::string name_suffix() const { + return " (id=" + std::to_string(_parent->node_id()) + ")"; + } + Status close(RuntimeState* state) override; [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; @@ -427,7 +438,7 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override; - virtual Status init(bool need_partitioner) { + virtual Status init(ExchangeType type) { return Status::InternalError("init() is only implemented in local exchange!"); } @@ -452,6 +463,8 @@ public: } virtual void get_dependency(std::vector& dependency, QueryContext* ctx) = 0; + virtual std::vector get_local_shuffle_exprs() const { return {}; } + virtual ExchangeType get_local_exchange_type() const { return ExchangeType::NOOP; } Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); @@ -570,7 +583,7 @@ public: [[nodiscard]] std::string debug_string(int indentation_level) const override; - virtual std::string id_name() { return " (id=" + std::to_string(_parent->node_id()) + ")"; } + virtual std::string name_suffix() { return " (id=" + std::to_string(_parent->node_id()) + ")"; } Dependency* dependency() override { return _dependency; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 636d4e235d..71b45ad79c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -231,7 +231,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast(root_pipeline->set_sink(_sink)); - // RETURN_IF_ERROR(_plan_local_shuffle()); + RETURN_IF_ERROR(_plan_local_shuffle()); // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { @@ -254,7 +254,34 @@ Status PipelineXFragmentContext::_plan_local_shuffle() { auto& children = _pipelines[pip_idx]->children(); if (children.empty()) { _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); - } else { + } else if (children.size() == 1) { + _pipelines[pip_idx]->set_need_to_local_shuffle(children[0]->need_to_local_shuffle()); + } + + int idx = 0; + bool do_local_exchange = false; + do { + auto& ops = _pipelines[pip_idx]->operator_xs(); + do_local_exchange = false; + for (; idx < ops.size();) { + if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) { + RETURN_IF_ERROR(_add_local_exchange( + idx, ops[idx]->node_id(), _runtime_state->obj_pool(), + _pipelines[pip_idx], ops[idx]->get_local_shuffle_exprs(), + ops[idx]->get_local_exchange_type(), &do_local_exchange)); + } + if (do_local_exchange) { + idx = 2; + break; + } + idx++; + } + } while (do_local_exchange); + if (_pipelines[pip_idx]->sink_x()->get_local_exchange_type() != ExchangeType::NOOP) { + RETURN_IF_ERROR(_add_local_exchange( + idx, _pipelines[pip_idx]->sink_x()->node_id(), _runtime_state->obj_pool(), + _pipelines[pip_idx], _pipelines[pip_idx]->sink_x()->get_local_shuffle_exprs(), + _pipelines[pip_idx]->sink_x()->get_local_exchange_type(), &do_local_exchange)); } } return Status::OK(); @@ -443,19 +470,20 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_total_load_streams(request.total_load_streams); _runtime_states[i]->set_num_local_sink(request.num_local_sink); std::map pipeline_id_to_task; - auto get_local_exchange_state = - [&](PipelinePtr pipeline) -> std::shared_ptr { + auto get_local_exchange_state = [&](PipelinePtr pipeline) + -> std::map> { + std::map> le_state_map; auto source_id = pipeline->operator_xs().front()->operator_id(); if (auto iter = _op_id_to_le_state.find(source_id); iter != _op_id_to_le_state.end()) { - return iter->second; + le_state_map.insert({source_id, iter->second}); } for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) { if (auto iter = _op_id_to_le_state.find(sink_to_source_id); iter != _op_id_to_le_state.end()) { - return iter->second; + le_state_map.insert({sink_to_source_id, iter->second}); } } - return nullptr; + return le_state_map; }; for (auto& pipeline : _pipelines) { auto task = std::make_unique( @@ -606,49 +634,100 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, return Status::OK(); } -Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op, - PipelinePtr& cur_pipe, const TPlanNode& tnode, +Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id, ObjectPool* pool, + PipelinePtr cur_pipe, const std::vector& texprs, - ExchangeType exchange_type) { + ExchangeType exchange_type, + bool* do_local_exchange) { if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) { return Status::OK(); } - auto parent = op; - RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get())); - auto local_exchange_id = next_operator_id(); - op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id, parent.get())); - RETURN_IF_ERROR(cur_pipe->add_operator(op)); - RETURN_IF_ERROR(parent->set_child(op)); - const auto downstream_pipeline_id = cur_pipe->id(); - if (_dag.find(downstream_pipeline_id) == _dag.end()) { - _dag.insert({downstream_pipeline_id, {}}); + if (!cur_pipe->need_to_local_shuffle() && exchange_type == ExchangeType::SHUFFLE) { + return Status::OK(); } - cur_pipe = add_pipeline(); - _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + *do_local_exchange = true; + + auto& operator_xs = cur_pipe->operator_xs(); + auto total_op_num = operator_xs.size(); + const auto downstream_pipeline_id = cur_pipe->id(); + auto local_exchange_id = next_operator_id(); + // 1. Create a new pipeline with local exchange sink. + auto new_pip = add_pipeline(); DataSinkOperatorXPtr sink; sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id, _num_instances, texprs)); - RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(new_pip->set_sink(sink)); - bool need_partitioner = false; auto shared_state = LocalExchangeSharedState::create_shared(); shared_state->source_dependencies.resize(_num_instances, nullptr); switch (exchange_type) { case ExchangeType::SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances); - need_partitioner = true; + new_pip->set_need_to_local_shuffle(false); + cur_pipe->set_need_to_local_shuffle(false); break; case ExchangeType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances); + new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle()); + cur_pipe->set_need_to_local_shuffle(true); break; default: return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)exchange_type)); } - RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner)); + RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type)); _op_id_to_le_state.insert({local_exchange_id, shared_state}); + + // 2. Initialize operators list. + std::copy(operator_xs.begin(), operator_xs.begin() + idx, + std::inserter(new_pip->operator_xs(), new_pip->operator_xs().end())); + + // 3. Erase operators in new pipeline. + OperatorXPtr source_op; + source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); + RETURN_IF_ERROR(source_op->init(exchange_type)); + operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); + if (operator_xs.size() > 0) { + RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); + } + + operator_xs.insert(operator_xs.begin(), source_op); + RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); + + std::vector> new_children; + std::vector edges_with_source; + for (auto child : cur_pipe->children()) { + bool found = false; + for (auto op : new_pip->operator_xs()) { + if (child->sink_x()->node_id() == op->node_id()) { + new_pip->set_children(child); + found = true; + }; + } + if (!found) { + new_children.push_back(child); + edges_with_source.push_back(child->id()); + } + } + new_children.push_back(new_pip); + edges_with_source.push_back(new_pip->id()); + + if (!new_pip->children().empty()) { + std::vector edges_with_sink; + for (auto child : new_pip->children()) { + edges_with_sink.push_back(child->id()); + } + _dag.insert({new_pip->id(), edges_with_sink}); + } + cur_pipe->set_children(new_children); + _dag[downstream_pipeline_id] = edges_with_source; + + CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) + << "total_op_num: " << total_op_num + << " cur_pipe->operator_xs().size(): " << cur_pipe->operator_xs().size() + << " new_pip->operator_xs().size(): " << new_pip->operator_xs().size(); return Status::OK(); } @@ -704,7 +783,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; sink.reset(new DistinctStreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode, @@ -712,10 +791,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); - - // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, - // tnode.agg_node.grouping_exprs, - // ExchangeType::PASSTHROUGH)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation) { op.reset(new StreamingAggSourceOperatorX(pool, tnode, next_operator_id(), descs)); @@ -725,7 +800,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; sink.reset(new StreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); @@ -733,9 +808,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); - // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, - // tnode.agg_node.grouping_exprs, - // ExchangeType::PASSTHROUGH)); } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -744,7 +816,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; @@ -752,20 +824,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); - - // if (tnode.agg_node.grouping_exprs.empty()) { - // if (tnode.agg_node.need_finalize) { - // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, - // tnode.agg_node.grouping_exprs, - // ExchangeType::PASSTHROUGH)); - // } else { - // // TODO(gabriel): maybe use local shuffle - // } - // } else if (cur_pipe->need_to_local_shuffle()) { - // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, - // tnode.agg_node.grouping_exprs, - // ExchangeType::SHUFFLE)); - // } } break; } @@ -777,7 +835,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - PipelinePtr build_side_pipe = add_pipeline(); + PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; @@ -786,20 +844,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); - std::vector probe_exprs; - const std::vector& eq_join_conjuncts = - tnode.hash_join_node.eq_join_conjuncts; - for (const auto& eq_join_conjunct : eq_join_conjuncts) { - probe_exprs.push_back(eq_join_conjunct.left); - } - if (tnode.hash_join_node.__isset.is_broadcast_join && - tnode.hash_join_node.is_broadcast_join) { - RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs, - ExchangeType::PASSTHROUGH)); - } else if (cur_pipe->need_to_local_shuffle()) { - RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs, - ExchangeType::SHUFFLE)); - } _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; @@ -812,7 +856,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - PipelinePtr build_side_pipe = add_pipeline(); + PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; @@ -835,7 +879,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag.insert({downstream_pipeline_id, {}}); } for (int i = 0; i < child_count; i++) { - PipelinePtr build_side_pipe = add_pipeline(); + PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); @@ -855,7 +899,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; @@ -873,7 +917,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; @@ -891,7 +935,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - cur_pipe = add_pipeline(); + cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; @@ -971,7 +1015,7 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node( } for (int child_id = 0; child_id < tnode.num_children; child_id++) { - PipelinePtr probe_side_pipe = add_pipeline(); + PipelinePtr probe_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(probe_side_pipe->id()); DataSinkOperatorXPtr sink; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 1390f2a954..b2239c4991 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -125,9 +125,9 @@ public: private: void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; - Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, - const TPlanNode& tnode, const std::vector& texprs, - ExchangeType exchange_type); + Status _add_local_exchange(int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, + const std::vector& texprs, ExchangeType exchange_type, + bool* do_local_exchange); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 3495d8eb27..559a76b8e6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -47,14 +47,14 @@ namespace doris::pipeline { PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - std::shared_ptr local_exchange_state, + std::map> le_state_map, int task_idx) : PipelineTask(pipeline, task_id, state, fragment_context, parent_profile), _operators(pipeline->operator_xs()), _source(_operators.front()), _root(_operators.back()), _sink(pipeline->sink_shared_pointer()), - _local_exchange_state(local_exchange_state), + _le_state_map(le_state_map), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); @@ -76,7 +76,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams { // set sink local state LocalSinkStateInfo info {_parent_profile, local_params.sender_id, - get_downstream_dependency(), _local_exchange_state, tsink}; + get_downstream_dependency(), _le_state_map, tsink}; RETURN_IF_ERROR(_sink->setup_local_state(state, info)); } @@ -87,9 +87,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); - LocalStateInfo info {parent_profile, scan_ranges, - deps, _local_exchange_state, - _task_idx, _source_dependency[op->operator_id()]}; + LocalStateInfo info {parent_profile, scan_ranges, deps, + _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; RETURN_IF_ERROR(op->setup_local_state(state, info)); parent_profile = state->get_local_state(op->operator_id())->profile(); } @@ -292,7 +291,7 @@ void PipelineXTask::finalize() { std::vector {}.swap(_downstream_dependency); DependencyMap {}.swap(_upstream_dependency); - _local_exchange_state = nullptr; + _le_state_map.clear(); } Status PipelineXTask::try_close(Status exec_status) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index e4ee71914b..2a5193d3c4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -52,7 +52,8 @@ class PipelineXTask : public PipelineTask { public: PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - std::shared_ptr local_exchange_state, int task_idx); + std::map> le_state_map, + int task_idx); Status prepare(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); @@ -209,7 +210,7 @@ private: DependencyMap _upstream_dependency; std::map _source_dependency; std::vector _downstream_dependency; - std::shared_ptr _local_exchange_state; + std::map> _le_state_map; int _task_idx; bool _dry_run = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d4f11d25e2..40404423e4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -844,6 +844,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params); if (!prepare_st.ok()) { + LOG(WARNING) << "Prepare failed: " << prepare_st.to_string(); context->close_if_prepare_failed(); return prepare_st; } @@ -923,6 +924,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params, i); if (!prepare_st.ok()) { + LOG(WARNING) << "Prepare failed: " << prepare_st.to_string(); context->close_if_prepare_failed(); static_cast(context->update_status(prepare_st)); return prepare_st; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6eb9f2d75e..72e56d09c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -310,6 +310,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor