diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index e21a4ee289..eb472d8884 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.catalog.constraint.ForeignKeyConstraint; import org.apache.doris.catalog.constraint.PrimaryKeyConstraint; import org.apache.doris.catalog.constraint.UniqueConstraint; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -375,6 +376,12 @@ public interface TableIf { return null; } + default List getFullQualifiers() { + return ImmutableList.of(getDatabase().getCatalog().getName(), + ClusterNamespace.getNameFromFullName(getDatabase().getFullName()), + getName()); + } + default boolean isManagedTable() { return getType() == TableType.OLAP || getType() == TableType.MATERIALIZED_VIEW; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java new file mode 100644 index 0000000000..b221e507b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Range; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Update mv by partition + */ +public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { + private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) { + super(logicalQuery, Optional.empty()); + } + + /** + * Construct command + * + * @param mv materialize view + * @param partitions update partitions in mv and tables + * @param tableWithPartKey the partitions key for different table + * @return command + */ + public static UpdateMvByPartitionCommand from(MTMV mv, Set partitions, + Map tableWithPartKey) { + NereidsParser parser = new NereidsParser(); + Map> predicates = + constructTableWithPredicates(partitions, tableWithPartKey); + List parts = constructPartsForMv(mv, partitions); + Plan plan = parser.parseSingle(mv.getQuerySql()); + plan = plan.accept(new PredicateAdder(), predicates); + UnboundTableSink sink = + new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(), + parts, plan); + return new UpdateMvByPartitionCommand(sink); + } + + private static List constructPartsForMv(MTMV mv, Set partitions) { + return mv.getPartitionNames().stream() + .filter(name -> { + PartitionItem mvPartItem = mv.getPartitionInfo().getItem(mv.getPartition(name).getId()); + return partitions.stream().anyMatch(p -> p.getIntersect(mvPartItem) != null); + }) + .collect(ImmutableList.toImmutableList()); + } + + private static Map> constructTableWithPredicates(Set partitions, + Map tableWithPartKey) { + ImmutableMap.Builder> builder = new ImmutableMap.Builder<>(); + tableWithPartKey.forEach((table, colName) -> + builder.put(table, constructPredicates(partitions, colName)) + ); + return builder.build(); + } + + private static Set constructPredicates(Set partitions, String colName) { + UnboundSlot slot = new UnboundSlot(colName); + return partitions.stream() + .map(item -> convertPartitionItemToPredicate(item, slot)) + .collect(ImmutableSet.toImmutableSet()); + } + + private static Expression convertPartitionItemToPredicate(PartitionItem item, Slot col) { + if (item instanceof ListPartitionItem) { + List inValues = ((ListPartitionItem) item).getItems().stream() + .map(key -> Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0)))) + .collect(ImmutableList.toImmutableList()); + return new InPredicate(col, inValues); + } else { + Range range = item.getItems(); + List exprs = new ArrayList<>(); + if (range.hasLowerBound()) { + PartitionKey key = range.lowerEndpoint(); + exprs.add(new GreaterThanEqual(col, Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0))))); + } + if (range.hasUpperBound()) { + PartitionKey key = range.upperEndpoint(); + exprs.add(new LessThan(col, Literal.fromLegacyLiteral(key.getKeys().get(0), + Type.fromPrimitiveType(key.getTypes().get(0))))); + } + return ExpressionUtils.and(exprs); + } + } + + static class PredicateAdder extends DefaultPlanRewriter>> { + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map> predicates) { + return new LogicalFilter<>(predicates.get(scan.getTable()), scan); + } + } +}