From 5f66335e543e9038ed23cbc89c2bf53e682640c1 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Wed, 13 Dec 2023 15:15:20 +0800 Subject: [PATCH] [feature](pipelineX) add local_shuffle in set_operation / assert_num operator (#28293) --- be/src/pipeline/exec/analytic_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/analytic_sink_operator.h | 3 ++- be/src/pipeline/exec/assert_num_rows_operator.h | 2 ++ be/src/pipeline/exec/set_probe_sink_operator.h | 13 ++++++++++++- be/src/pipeline/exec/set_sink_operator.h | 13 ++++++++++++- .../glue/translator/PhysicalPlanTranslator.java | 1 + .../org/apache/doris/planner/SetOperationNode.java | 8 ++++++++ gensrc/thrift/PlanNodes.thrift | 2 ++ 8 files changed, 41 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 835b9f7dc1..3e93645699 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -192,8 +192,8 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate) { -} + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _partition_exprs(tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 576af1c660..8fa1c2e1ea 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -107,7 +107,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } ExchangeType get_local_exchange_type() const override { if (_partition_by_eq_expr_ctxs.empty()) { return ExchangeType::PASSTHROUGH; @@ -133,6 +133,7 @@ private: std::vector _num_agg_input; const bool _is_colocate; + const std::vector _partition_exprs; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 025f6f8c67..060c5be7e5 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -57,6 +57,8 @@ public: [[nodiscard]] bool is_source() const override { return false; } + ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; } + private: friend class AssertNumRowsLocalState; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 89a2ab3bb3..dfa27ebd3f 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -123,7 +123,12 @@ public: friend class SetProbeSinkLocalState; SetProbeSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(sink_id, tnode.node_id, tnode.node_id), _cur_child_id(child_id) {} + : Base(sink_id, tnode.node_id, tnode.node_id), + _cur_child_id(child_id), + _is_colocate(is_intersect ? tnode.intersect_node.is_colocate + : tnode.except_node.is_colocate), + _partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id] + : tnode.except_node.result_expr_lists[child_id]) {} ~SetProbeSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError( @@ -139,6 +144,10 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } + ExchangeType get_local_exchange_type() const override { + return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + } private: void _finalize_probe(SetProbeSinkLocalState& local_state); @@ -149,6 +158,8 @@ private: const int _cur_child_id; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; + const bool _is_colocate; + const std::vector _partition_exprs; using OperatorBase::_child_x; }; diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index d8abe12c9a..e44d59d7c6 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -111,7 +111,12 @@ public: friend class SetSinkLocalState; SetSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(sink_id, tnode.node_id, tnode.node_id), _cur_child_id(child_id) {} + : Base(sink_id, tnode.node_id, tnode.node_id), + _cur_child_id(child_id), + _is_colocate(is_intersect ? tnode.intersect_node.is_colocate + : tnode.except_node.is_colocate), + _partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id] + : tnode.except_node.result_expr_lists[child_id]) {} ~SetSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", @@ -126,6 +131,10 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + std::vector get_local_shuffle_exprs() const override { return _partition_exprs; } + ExchangeType get_local_exchange_type() const override { + return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + } private: template @@ -140,6 +149,8 @@ private: int _child_quantity; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; + const bool _is_colocate; + const std::vector _partition_exprs; using OperatorBase::_child_x; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 239220cba7..7050323b1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1755,6 +1755,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor> const_expr_lists // Index of the first child that needs to be materialized. 4: required i64 first_materialized_child_idx + 5: optional bool is_colocate } struct TExceptNode { @@ -1048,6 +1049,7 @@ struct TExceptNode { 3: required list> const_expr_lists // Index of the first child that needs to be materialized. 4: required i64 first_materialized_child_idx + 5: optional bool is_colocate }