diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index e523fc5163..c850cdae58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -95,6 +95,7 @@ import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
+import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughUnion;
import org.apache.doris.nereids.rules.rewrite.PushDownMinMaxThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownSumThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughJoin;
@@ -286,6 +287,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
new PushDownLimit(),
new PushDownTopNThroughJoin(),
new PushDownLimitDistinctThroughJoin(),
+ new PushDownLimitDistinctThroughUnion(),
new PushDownTopNThroughWindow(),
new PushDownTopNThroughUnion()
),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 09e01136d8..0aa71f39ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -269,6 +269,7 @@ public enum RuleType {
// limit distinct push down
PUSH_DOWN_LIMIT_DISTINCT_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSH_DOWN_LIMIT_DISTINCT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
+ PUSH_DOWN_LIMIT_DISTINCT_THROUGH_UNION(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE(RuleTypeClass.REWRITE),
ADJUST_CONJUNCTS_RETURN_TYPE(RuleTypeClass.REWRITE),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java
new file mode 100644
index 0000000000..b685f680c9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * LIMIT-Distinct
+ * -> Union All
+ * -> child plan1
+ * -> child plan2
+ * -> child plan3
+ *
+ * rewritten to
+ *
+ * LIMIT-Distinct
+ * -> Union All
+ * -> LIMIT-Distinct
+ * -> child plan1
+ * -> LIMIT-Distinct
+ * -> LIMIT plan2
+ * -> TopN-Distinct
+ * -> LIMIT plan3
+ *
+ */
+public class PushDownLimitDistinctThroughUnion implements RewriteRuleFactory {
+
+ @Override
+ public List buildRules() {
+ return ImmutableList.of(
+ logicalLimit(logicalAggregate(logicalUnion().when(union -> union.getQualifier() == Qualifier.ALL))
+ .when(agg -> agg.isDistinct()))
+ .then(limit -> {
+ LogicalAggregate agg = limit.child();
+ LogicalUnion union = agg.child();
+
+ List newChildren = new ArrayList<>();
+ for (Plan child : union.children()) {
+ Map replaceMap = new HashMap<>();
+ for (int i = 0; i < union.getOutputs().size(); ++i) {
+ NamedExpression output = union.getOutputs().get(i);
+ replaceMap.put(output, child.getOutput().get(i));
+ }
+
+ List newGroupBy = agg.getGroupByExpressions().stream()
+ .map(expr -> ExpressionUtils.replace(expr, replaceMap))
+ .collect(Collectors.toList());
+ List newOutputs = agg.getOutputs().stream()
+ .map(expr -> ExpressionUtils.replace(expr, replaceMap))
+ .collect(Collectors.toList());
+
+ LogicalAggregate newAgg = new LogicalAggregate<>(newGroupBy, newOutputs, child);
+ LogicalLimit newLimit = limit.withLimitChild(limit.getLimit() + limit.getOffset(),
+ 0, newAgg);
+
+ newChildren.add(newLimit);
+ }
+
+ if (union.children().equals(newChildren)) {
+ return null;
+ }
+ return limit.withChildren(agg.withChildren(union.withChildren(newChildren)));
+ })
+ .toRule(RuleType.PUSH_DOWN_LIMIT_DISTINCT_THROUGH_UNION)
+ );
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java
index 15023cb514..7563ecd516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java
@@ -46,6 +46,7 @@ import java.util.Map;
*
* rewritten to
*
+ * TopN-Distinct
* -> Union All
* -> TopN-Distinct
* -> child plan1
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
index c27254a7ed..3149d140fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.rules.exploration.mv.SlotMapping;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.expression.rules.FoldConstantRule;
import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
@@ -300,6 +301,19 @@ public class ExpressionUtils {
return expr.accept(ExpressionReplacer.INSTANCE, replaceMap);
}
+ /**
+ * replace NameExpression.
+ */
+ public static NamedExpression replace(NamedExpression expr,
+ Map extends Expression, ? extends Expression> replaceMap) {
+ Expression newExpr = expr.accept(ExpressionReplacer.INSTANCE, replaceMap);
+ if (newExpr instanceof NamedExpression) {
+ return (NamedExpression) newExpr;
+ } else {
+ return new Alias(expr.getExprId(), newExpr, expr.getName());
+ }
+ }
+
public static List replace(List exprs,
Map extends Expression, ? extends Expression> replaceMap) {
return exprs.stream()
diff --git a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out
new file mode 100644
index 0000000000..1cb6435483
--- /dev/null
+++ b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !basic --
+PhysicalResultSink
+--PhysicalLimit[GLOBAL]
+----PhysicalDistribute
+------PhysicalLimit[LOCAL]
+--------hashAgg[GLOBAL]
+----------PhysicalDistribute
+------------hashAgg[LOCAL]
+--------------PhysicalUnion
+----------------PhysicalDistribute
+------------------PhysicalLimit[LOCAL]
+--------------------hashAgg[LOCAL]
+----------------------PhysicalProject
+------------------------PhysicalOlapScan[t]
+----------------PhysicalDistribute
+------------------PhysicalLimit[LOCAL]
+--------------------hashAgg[LOCAL]
+----------------------PhysicalProject
+------------------------PhysicalOlapScan[t]
+
diff --git a/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy
new file mode 100644
index 0000000000..cbb509db32
--- /dev/null
+++ b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("push_down_limit_distinct_through_join") {
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+
+ sql """
+ DROP TABLE IF EXISTS t;
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS t(
+ `id` int(32),
+ `score` int(64) NULL,
+ `name` varchar(64) NULL
+ ) ENGINE = OLAP
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql "insert into t values (1, 1, 'a')"
+ sql "insert into t values (2, null, 'a')"
+ sql "insert into t values (3, 1, null)"
+ sql "insert into t values (4, 2, 'b')"
+ sql "insert into t values (5, null, 'b')"
+ sql "insert into t values (6, 2, null)"
+ sql "insert into t values (7, 3, 'c')"
+ sql "insert into t values (8, null, 'c')"
+ sql "insert into t values (9, 3, null)"
+ sql "insert into t values (10, null, null)"
+
+ sql "SET ENABLE_NEREIDS_RULES=push_down_limit_distinct_through_join"
+
+ qt_basic """
+ explain shape plan (select id from t t1 union select id from t t2) limit 0, 200;
+ """
+}
\ No newline at end of file