From 15f5acf1aec43428418aed046c204cca12d58189 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Tue, 12 Dec 2023 18:47:46 +0800 Subject: [PATCH] [feature](pipelineX) add local_shuffle in sort partition sort analytic node (#28265) --- be/src/pipeline/exec/analytic_sink_operator.cpp | 4 +++- be/src/pipeline/exec/analytic_sink_operator.h | 10 ++++++++++ be/src/pipeline/exec/partition_sort_sink_operator.h | 6 ++++++ be/src/pipeline/exec/sort_sink_operator.cpp | 3 ++- be/src/pipeline/exec/sort_sink_operator.h | 8 ++++++++ .../glue/translator/PhysicalPlanTranslator.java | 3 +++ .../org/apache/doris/planner/AnalyticEvalNode.java | 8 +++++++- .../org/apache/doris/planner/DistributedPlanner.java | 1 + .../main/java/org/apache/doris/planner/SortNode.java | 11 ++++++++++- gensrc/thrift/PlanNodes.thrift | 3 +++ 10 files changed, 53 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 84afab8344..835b9f7dc1 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -191,7 +191,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id - : 0) {} + : 0), + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate) { +} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 19acfc4d54..576af1c660 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -108,6 +108,15 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + ExchangeType get_local_exchange_type() const override { + if (_partition_by_eq_expr_ctxs.empty()) { + return ExchangeType::PASSTHROUGH; + } else if (_order_by_eq_expr_ctxs.empty()) { + return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; + } + return ExchangeType::NOOP; + } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -123,6 +132,7 @@ private: const TTupleId _buffered_tuple_id; std::vector _num_agg_input; + const bool _is_colocate; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index aa82cbd233..ecbbad5c65 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -105,6 +105,12 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + ExchangeType get_local_exchange_type() const override { + if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { + return ExchangeType::NOOP; + } + return ExchangeType::PASSTHROUGH; + } private: friend class PartitionSortSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 13a204e197..0eb14fe056 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -76,7 +76,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _limit(tnode.limit), _use_topn_opt(tnode.sort_node.use_topn_opt), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), - _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {} + _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), + _merge_by_exchange(tnode.sort_node.merge_by_exchange) {} Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index b82871f943..3f44c118c8 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -93,6 +93,13 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; + ExchangeType get_local_exchange_type() const override { + if (_merge_by_exchange) { + // The current sort node is used for the ORDER BY + return ExchangeType::PASSTHROUGH; + } + return ExchangeType::NOOP; + } private: friend class SortSinkLocalState; @@ -113,6 +120,7 @@ private: const RowDescriptor _row_descriptor; const bool _use_two_phase_read; + const bool _merge_by_exchange; }; } // namespace pipeline diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 46e42dbe2c..239220cba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1800,6 +1800,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor analyticFnCalls, List partitionExprs, List orderByElements, @@ -181,6 +183,10 @@ public class AnalyticEvalNode extends PlanNode { cardinality = getChild(0).cardinality; } + public void setColocate(boolean colocate) { + this.isColocate = colocate; + } + @Override protected String debugString() { List orderByElementStrs = Lists.newArrayList(); @@ -215,7 +221,7 @@ public class AnalyticEvalNode extends PlanNode { msg.analytic_node.setPartitionExprs(Expr.treesToThrift(substitutedPartitionExprs)); msg.analytic_node.setOrderByExprs(Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements))); msg.analytic_node.setAnalyticFunctions(Expr.treesToThrift(analyticFnCalls)); - + msg.analytic_node.setIsColocate(isColocate); if (analyticWindow == null) { if (!orderByElements.isEmpty()) { msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index d3dd27ee73..6f54bd898f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -1308,6 +1308,7 @@ public class DistributedPlanner { exchNode.setLimit(limit); } exchNode.setMergeInfo(node.getSortInfo()); + node.setMergeByExchange(); exchNode.setOffset(offset); // Child nodes should not process the offset. If there is a limit, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 375e37edd8..dba8f11798 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -64,7 +64,11 @@ public class SortNode extends PlanNode { private boolean useTopnOpt; private boolean useTwoPhaseReadOpt; - private boolean isDefaultLimit; + // If mergeByexchange is set to true, the sort information is pushed to the + // exchange node, and the sort node is used for the ORDER BY . + private boolean mergeByexchange = false; + + private boolean isDefaultLimit; // if true, the output of this node feeds an AnalyticNode private boolean isAnalyticSort; private DataPartition inputPartition; @@ -134,6 +138,10 @@ public class SortNode extends PlanNode { return info; } + public void setMergeByExchange() { + this.mergeByexchange = true; + } + public boolean getUseTopnOpt() { return useTopnOpt; } @@ -309,6 +317,7 @@ public class SortNode extends PlanNode { msg.sort_node = sortNode; msg.sort_node.setOffset(offset); msg.sort_node.setUseTopnOpt(useTopnOpt); + msg.sort_node.setMergeByExchange(this.mergeByexchange); } @Override diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5cbb15721c..8b846bd576 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -890,6 +890,7 @@ struct TSortNode { // Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT. 6: optional bool is_default_limit 7: optional bool use_topn_opt + 8: optional bool merge_by_exchange } enum TopNAlgorithm { @@ -999,6 +1000,8 @@ struct TAnalyticNode { // should be evaluated over a row that is composed of the child tuple and the buffered // tuple 9: optional Exprs.TExpr order_by_eq + + 10: optional bool is_colocate } struct TMergeNode {