From fc762f426b0bdbd188bbc9bf19c540b4c06fae79 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 5 Feb 2024 11:40:10 +0800 Subject: [PATCH] [enhance](mtmv) mtmv disable hive auto refresh (#30775) - If the `related table` is `hive`, do not refresh automatically - If the `related table` is `hive`, the partition col is allowed to be `null`. Otherwise, it must be `not null` - add more `ut` --- .../org/apache/doris/catalog/OlapTable.java | 9 + .../catalog/external/HMSExternalTable.java | 10 + .../doris/common/proc/PartitionsProcDir.java | 4 +- .../doris/common/proc/TablesProcDir.java | 3 +- .../doris/job/extensions/mtmv/MTMVTask.java | 36 +- .../apache/doris/mtmv/MTMVPartitionUtil.java | 438 +++++++++++++++++ .../apache/doris/mtmv/MTMVRelatedTableIf.java | 15 + .../doris/mtmv/MTMVRelationManager.java | 3 +- .../apache/doris/mtmv/MTMVRewriteUtil.java | 87 ++++ .../org/apache/doris/mtmv/MTMVService.java | 6 +- .../java/org/apache/doris/mtmv/MTMVUtil.java | 452 +----------------- .../mv/AbstractMaterializedViewRule.java | 6 +- .../exploration/mv/MaterializedViewUtils.java | 2 +- .../plans/commands/info/RefreshMTMVInfo.java | 4 +- .../tablefunction/MetadataGenerator.java | 4 +- .../doris/mtmv/MTMVPartitionUtilTest.java | 187 ++++++++ .../doris/mtmv/MTMVRefreshSnapshotTest.java | 96 ++++ .../doris/mtmv/MTMVRewriteUtilTest.java | 254 ++++++++++ .../org/apache/doris/mtmv/MTMVTaskTest.java | 163 +++++++ .../mv/MaterializedViewUtilsTest.java | 2 - .../data/mtmv_p0/test_hive_mtmv.out | 5 + .../suites/mtmv_p0/test_hive_mtmv.groovy | 8 + 22 files changed, 1313 insertions(+), 481 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index bc847dd22c..61459d1a56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2596,4 +2596,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return getPartitionOrAnalysisException(partitionId).getName(); } + @Override + public boolean needAutoRefresh() { + return true; + } + + @Override + public boolean isPartitionColumnAllowNull() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index aa4258baad..c31ba11a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -885,6 +885,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI ((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive()); return partitionValuesList; } + + @Override + public boolean needAutoRefresh() { + return false; + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 01feaf2368..4703429fa1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -45,7 +45,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -309,7 +309,7 @@ public class PartitionsProcDir implements ProcDirInterface { partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); if (olapTable instanceof MTMV) { try { - List partitionUnSyncTables = MTMVUtil + List partitionUnSyncTables = MTMVPartitionUtil .getPartitionUnSyncTables((MTMV) olapTable, partitionId); partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); partitionInfo.add(partitionUnSyncTables.toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java index b3ce9be35c..c2926c829a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.ListComparator; @@ -91,7 +90,7 @@ public class TablesProcDir implements ProcDirInterface { String partitionKey = FeConstants.null_string; table.readLock(); try { - if (table.getType() == TableType.OLAP) { + if (table instanceof OlapTable) { OlapTable olapTable = (OlapTable) table; if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) { partitionNum = olapTable.getPartitions().size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 194172a673..6a861200e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -18,17 +18,14 @@ package org.apache.doris.job.extensions.mtmv; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; @@ -36,6 +33,7 @@ import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; @@ -145,6 +143,13 @@ public class MTMVTask extends AbstractTask { this.taskContext = Objects.requireNonNull(taskContext); } + // only for test + public MTMVTask(MTMV mtmv, MTMVRelation relation, MTMVTaskContext taskContext) { + this.mtmv = mtmv; + this.relation = relation; + this.taskContext = taskContext; + } + @Override public void run() throws JobException { LOG.info("mtmv task run, taskId: {}", super.getTaskId()); @@ -161,10 +166,10 @@ public class MTMVTask extends AbstractTask { // To be completely consistent with hive, you need to manually refresh the cache // refreshHmsTable(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); } List needRefreshPartitionIds = calculateNeedRefreshPartitions(); - this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); + this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; @@ -181,8 +186,8 @@ public class MTMVTask extends AbstractTask { Set execPartitionIds = Sets.newHashSet(needRefreshPartitionIds .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); // need get names before exec - List execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); - Map execPartitionSnapshots = MTMVUtil + List execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + Map execPartitionSnapshots = MTMVPartitionUtil .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds); exec(ctx, execPartitionIds, tableWithPartKey); completedPartitions.addAll(execPartitionNames); @@ -241,7 +246,7 @@ public class MTMVTask extends AbstractTask { LOG.info("mtmv task before, taskId: {}", super.getTaskId()); super.before(); try { - mtmv = getMTMV(); + mtmv = MTMVUtil.getMTMV(dbId, mtmvId); } catch (UserException e) { LOG.warn("before task failed:", e); throw new JobException(e); @@ -267,11 +272,6 @@ public class MTMVTask extends AbstractTask { } } - private MTMV getMTMV() throws DdlException, MetaNotFoundException { - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); - return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); - } - @Override public void runTask() throws JobException { LOG.info("mtmv task runTask, taskId: {}", super.getTaskId()); @@ -296,7 +296,7 @@ public class MTMVTask extends AbstractTask { String dbName = ""; String mvName = ""; try { - MTMV mtmv = getMTMV(); + MTMV mtmv = MTMVUtil.getMTMV(dbId, mtmvId); dbName = mtmv.getQualifiedDbName(); mvName = mtmv.getName(); } catch (UserException e) { @@ -386,20 +386,20 @@ public class MTMVTask extends AbstractTask { } } - private List calculateNeedRefreshPartitions() throws AnalysisException { + public List calculateNeedRefreshPartitions() throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { if (taskContext.isComplete()) { return mtmv.getPartitionIds(); } else if (!CollectionUtils .isEmpty(taskContext.getPartitions())) { - return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); + return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); } } // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); + boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables()); if (fresh) { return Lists.newArrayList(); } @@ -413,7 +413,7 @@ public class MTMVTask extends AbstractTask { } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables()); + return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables()); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java new file mode 100644 index 0000000000..a8e3e11869 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -0,0 +1,438 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.AddPartitionClause; +import org.apache.doris.analysis.DropPartitionClause; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.SinglePartitionDesc; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +public class MTMVPartitionUtil { + private static final Logger LOG = LogManager.getLogger(MTMVPartitionUtil.class); + + /** + * Determine whether the partition is sync with retated partition and other baseTables + * + * @param mtmv + * @param partitionId + * @param tables + * @param excludedTriggerTables + * @return + * @throws AnalysisException + */ + public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set tables, + Set excludedTriggerTables) throws AnalysisException { + boolean isSyncWithPartition = true; + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + // if follow base table, not need compare with related table, only should compare with related partition + excludedTriggerTables.add(relatedTable.getName()); + PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); + Map relatedPartitionItems = relatedTable.getPartitionItems(); + long relatedPartitionId = getExistPartitionId(item, + relatedPartitionItems); + if (relatedPartitionId == -1L) { + LOG.warn("can not found related partition: " + partitionId); + return false; + } + isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); + } + return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); + + } + + /** + * Align the partitions of mtmv and related tables, delete more and add less + * + * @param mtmv + * @param relatedTable + * @throws DdlException + * @throws AnalysisException + */ + public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable) + throws DdlException, AnalysisException { + Map relatedTableItems = relatedTable.getPartitionItems(); + Map mtmvItems = mtmv.getPartitionItems(); + // drop partition of mtmv + for (Entry entry : mtmvItems.entrySet()) { + long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); + if (partitionId == -1L) { + dropPartition(mtmv, entry.getKey()); + } + } + // add partition for mtmv + for (Entry entry : relatedTableItems.entrySet()) { + long partitionId = getExistPartitionId(entry.getValue(), mtmvItems); + if (partitionId == -1L) { + addPartition(mtmv, entry.getValue()); + } + } + } + + public static List getPartitionNamesByIds(MTMV mtmv, Collection ids) throws AnalysisException { + List res = Lists.newArrayList(); + for (Long partitionId : ids) { + res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); + } + return res; + } + + public static List getPartitionsIdsByNames(MTMV mtmv, List partitions) throws AnalysisException { + List res = Lists.newArrayList(); + for (String partitionName : partitions) { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); + res.add(partition.getId()); + } + return res; + } + + /** + * check if table is sync with all baseTables + * + * @param mtmv + * @return + */ + public static boolean isMTMVSync(MTMV mtmv) { + MTMVRelation mtmvRelation = mtmv.getRelation(); + if (mtmvRelation == null) { + return false; + } + try { + return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet()); + } catch (AnalysisException e) { + LOG.warn("isMTMVSync failed: ", e); + return false; + } + } + + /** + * Determine whether the mtmv is sync with tables + * + * @param mtmv + * @param tables + * @param excludeTables + * @return + * @throws AnalysisException + */ + public static boolean isMTMVSync(MTMV mtmv, Set tables, Set excludeTables) + throws AnalysisException { + Collection partitions = mtmv.getPartitions(); + for (Partition partition : partitions) { + if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) { + return false; + } + } + return true; + } + + /** + * get not sync tables + * + * @param mtmv + * @param partitionId + * @return + * @throws AnalysisException + */ + public static List getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { + List res = Lists.newArrayList(); + for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { + TableIf table = MTMVUtil.getTable(baseTableInfo); + if (!(table instanceof MTMVRelatedTableIf)) { + continue; + } + MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table; + if (!mtmvRelatedTableIf.needAutoRefresh()) { + continue; + } + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { + PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); + Map relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems(); + long relatedPartitionId = getExistPartitionId(item, + relatedPartitionItems); + if (relatedPartitionId == -1L) { + throw new AnalysisException("can not found related partition"); + } + boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf, + relatedPartitionId); + if (!isSyncWithPartition) { + res.add(mtmvRelatedTableIf.getName()); + } + } else { + if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) { + res.add(table.getName()); + } + } + } + return res; + } + + /** + * Get the partitions that need to be refreshed + * + * @param mtmv + * @param baseTables + * @return + */ + public static List getMTMVNeedRefreshPartitions(MTMV mtmv, Set baseTables) { + Collection allPartitions = mtmv.getPartitions(); + List res = Lists.newArrayList(); + for (Partition partition : allPartitions) { + try { + if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables, + mtmv.getExcludedTriggerTables())) { + res.add(partition.getId()); + } + } catch (AnalysisException e) { + res.add(partition.getId()); + LOG.warn("check isMTMVPartitionSync failed", e); + } + } + return res; + } + + /** + * compare last update time of mtmvPartition and tablePartition + * + * @param mtmv + * @param mtmvPartitionId + * @param relatedTable + * @param relatedPartitionId + * @return + * @throws AnalysisException + */ + public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, + MTMVRelatedTableIf relatedTable, + Long relatedPartitionId) throws AnalysisException { + if (!relatedTable.needAutoRefresh()) { + return true; + } + MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable + .getPartitionSnapshot(relatedPartitionId); + String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); + String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); + return mtmv.getRefreshSnapshot() + .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot); + } + + /** + * like p_00000101_20170201 + * + * @param desc + * @return + */ + public static String generatePartitionName(PartitionKeyDesc desc) { + String partitionName = "p_"; + partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "") + .replaceAll("\\(|\\)|\\,|\\[|\\]", "_"); + if (partitionName.length() > 50) { + partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName)) + + "_" + System.currentTimeMillis(); + } + return partitionName; + } + + /** + * drop partition of mtmv + * + * @param mtmv + * @param partitionId + */ + private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { + if (!mtmv.writeLockIfExist()) { + return; + } + try { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); + DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); + Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); + } finally { + mtmv.writeUnlock(); + } + + } + + /** + * add partition for mtmv like relatedPartitionId of relatedTable + * + * @param mtmv + * @param partitionItem + * @throws DdlException + */ + private static void addPartition(MTMV mtmv, PartitionItem partitionItem) + throws DdlException { + PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc(); + Map partitionProperties = Maps.newHashMap(); + SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, + generatePartitionName(oldPartitionKeyDesc), + oldPartitionKeyDesc, partitionProperties); + + AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, + mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); + Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); + } + + /** + * compare PartitionItem and return equals partitionId + * if not found, return -1L + * + * @param target + * @param sources + * @return + */ + private static long getExistPartitionId(PartitionItem target, Map sources) { + for (Entry entry : sources.entrySet()) { + if (target.equals(entry.getValue())) { + return entry.getKey(); + } + } + return -1L; + } + + /** + * Determine is sync, ignoring excludedTriggerTables and non OlapTanle + * + * @param mtmvPartitionId + * @param tables + * @param excludedTriggerTables + * @return + */ + private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set tables, + Set excludedTriggerTables) throws AnalysisException { + for (BaseTableInfo baseTableInfo : tables) { + TableIf table = null; + try { + table = MTMVUtil.getTable(baseTableInfo); + } catch (AnalysisException e) { + LOG.warn("get table failed, {}", baseTableInfo, e); + return false; + } + if (excludedTriggerTables.contains(table.getName())) { + continue; + } + boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo); + if (!syncWithBaseTable) { + return false; + } + } + return true; + } + + private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo) + throws AnalysisException { + TableIf table = null; + try { + table = MTMVUtil.getTable(baseTableInfo); + } catch (AnalysisException e) { + LOG.warn("get table failed, {}", baseTableInfo, e); + return false; + } + + if (!(table instanceof MTMVRelatedTableIf)) { + // if not MTMVRelatedTableIf, we can not get snapshot from it, + // Currently, it is believed to be synchronous + return true; + } + MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table; + if (!baseTable.needAutoRefresh()) { + return true; + } + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); + String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); + return mtmv.getRefreshSnapshot() + .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); + } + + public static Map generatePartitionSnapshots(MTMV mtmv, + Set baseTables, Set partitionIds) + throws AnalysisException { + Map res = Maps.newHashMap(); + for (Long partitionId : partitionIds) { + res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId)); + } + return res; + } + + + private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, + Set baseTables, Long partitionId) + throws AnalysisException { + MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + List relatedPartitionIds = getMTMVPartitionRelatedPartitions( + mtmv.getPartitionItems().get(partitionId), + relatedTable); + + for (Long relatedPartitionId : relatedPartitionIds) { + MTMVSnapshotIf partitionSnapshot = relatedTable + .getPartitionSnapshot(relatedPartitionId); + refreshPartitionSnapshot.getPartitions() + .put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot); + } + } + for (BaseTableInfo baseTableInfo : baseTables) { + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { + continue; + } + TableIf table = MTMVUtil.getTable(baseTableInfo); + if (!(table instanceof MTMVRelatedTableIf)) { + continue; + } + refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); + } + return refreshPartitionSnapshot; + } + + private static List getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem, + MTMVRelatedTableIf relatedTable) { + List res = Lists.newArrayList(); + Map relatedPartitionItems = relatedTable.getPartitionItems(); + for (Entry entry : relatedPartitionItems.entrySet()) { + if (mtmvPartitionItem.equals(entry.getValue())) { + res.add(entry.getKey()); + // current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table + return res; + } + } + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 51773db0df..46454679b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -87,4 +87,19 @@ public interface MTMVRelatedTableIf extends TableIf { * @throws AnalysisException */ String getPartitionName(long partitionId) throws AnalysisException; + + /** + * Does the current type of table allow timed triggering + * + * @return If return false,The method of comparing whether to synchronize will directly return true, + * otherwise the snapshot information will be compared + */ + boolean needAutoRefresh(); + + /** + * if allow partition column `isAllowNull` + * + * @return + */ + boolean isPartitionColumnAllowNull(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 77414e4fc8..aa7ffd2426 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -71,7 +71,8 @@ public class MTMVRelationManager implements MTMVHookService { for (BaseTableInfo tableInfo : mvInfos) { try { MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo); - if (!CollectionUtils.isEmpty(MTMVUtil.getMTMVCanRewritePartitions(mtmv, ctx))) { + if (!CollectionUtils + .isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, System.currentTimeMillis()))) { res.add(mtmv); } } catch (AnalysisException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java new file mode 100644 index 0000000000..666a79eba9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.List; + +public class MTMVRewriteUtil { + private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class); + + /** + * Determine which partition of mtmv can be rewritten + * + * @param mtmv + * @param ctx + * @return + */ + public static Collection getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx, + long currentTimeMills) { + List res = Lists.newArrayList(); + Collection allPartitions = mtmv.getPartitions(); + // check session variable if enable rewrite + if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) { + return res; + } + if (MTMVUtil.mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable() + .isMaterializedViewRewriteEnableContainExternalTable()) { + return res; + } + + MTMVRelation mtmvRelation = mtmv.getRelation(); + if (mtmvRelation == null) { + return res; + } + // check mv is normal + if (!(mtmv.getStatus().getState() == MTMVState.NORMAL + && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { + return res; + } + // check gracePeriod + long gracePeriodMills = mtmv.getGracePeriod(); + for (Partition partition : allPartitions) { + if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() + + gracePeriodMills)) { + res.add(partition); + continue; + } + try { + if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), + Sets.newHashSet())) { + res.add(partition); + } + } catch (AnalysisException e) { + // ignore it + LOG.warn("check isMTMVPartitionSync failed", e); + } + } + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 6abb22f3e5..227166e56d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -84,10 +84,10 @@ public class MTMVService { public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException { Objects.requireNonNull(mtmv); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); - } LOG.info("createMTMV: " + mtmv.getName()); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable()); + } for (MTMVHookService mtmvHookService : hooks.values()) { mtmvHookService.createMTMV(mtmv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 74593e5def..3b97e35141 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -17,39 +17,19 @@ package org.apache.doris.mtmv; -import org.apache.doris.analysis.AddPartitionClause; -import org.apache.doris.analysis.DropPartitionClause; -import org.apache.doris.analysis.PartitionKeyDesc; -import org.apache.doris.analysis.SinglePartitionDesc; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; -import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; public class MTMVUtil { - private static final Logger LOG = LogManager.getLogger(MTMVUtil.class); /** * get Table by BaseTableInfo @@ -66,380 +46,18 @@ public class MTMVUtil { return table; } - /** - * Determine whether the partition is sync with retated partition and other baseTables - * - * @param mtmv - * @param partitionId - * @param tables - * @param excludedTriggerTables - * @return - * @throws AnalysisException - */ - private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set tables, - Set excludedTriggerTables) throws AnalysisException { - boolean isSyncWithPartition = true; - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - // if follow base table, not need compare with related table, only should compare with related partition - excludedTriggerTables.add(relatedTable.getName()); - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map relatedPartitionItems = relatedTable.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { - LOG.warn("can not found related partition: " + partitionId); - return false; - } - isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); - } - return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); - + public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotFoundException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); + return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); } /** - * Align the partitions of mtmv and related tables, delete more and add less - * - * @param mtmv - * @param relatedTable - * @throws DdlException - * @throws AnalysisException - */ - public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable) - throws DdlException, AnalysisException { - Map relatedTableItems = relatedTable.getPartitionItems(); - Map mtmvItems = mtmv.getPartitionItems(); - // drop partition of mtmv - for (Entry entry : mtmvItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); - if (partitionId == -1L) { - dropPartition(mtmv, entry.getKey()); - } - } - // add partition for mtmv - for (Entry entry : relatedTableItems.entrySet()) { - long partitionId = getExistPartitionId(entry.getValue(), mtmvItems); - if (partitionId == -1L) { - addPartition(mtmv, entry.getValue()); - } - } - } - - public static List getPartitionNamesByIds(MTMV mtmv, Collection ids) throws AnalysisException { - List res = Lists.newArrayList(); - for (Long partitionId : ids) { - res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); - } - return res; - } - - public static List getPartitionsIdsByNames(MTMV mtmv, List partitions) throws AnalysisException { - List res = Lists.newArrayList(); - for (String partitionName : partitions) { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); - res.add(partition.getId()); - } - return res; - } - - /** - * check if table is sync with all baseTables + * if base tables of mtmv contains external table * * @param mtmv * @return */ - public static boolean isMTMVSync(MTMV mtmv) { - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return false; - } - try { - return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet()); - } catch (AnalysisException e) { - LOG.warn("isMTMVSync failed: ", e); - return false; - } - } - - /** - * Determine whether the mtmv is sync with tables - * - * @param mtmv - * @param tables - * @param excludeTables - * @return - * @throws AnalysisException - */ - public static boolean isMTMVSync(MTMV mtmv, Set tables, Set excludeTables) - throws AnalysisException { - Collection partitions = mtmv.getPartitions(); - for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) { - return false; - } - } - return true; - } - - /** - * get not sync tables - * - * @param mtmv - * @param partitionId - * @return - * @throws AnalysisException - */ - public static List getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { - List res = Lists.newArrayList(); - for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { - TableIf table = getTable(baseTableInfo); - if (!(table instanceof MTMVRelatedTableIf)) { - continue; - } - MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table; - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv - .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); - Map relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems(); - long relatedPartitionId = getExistPartitionId(item, - relatedPartitionItems); - if (relatedPartitionId == -1L) { - throw new AnalysisException("can not found related partition"); - } - boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf, - relatedPartitionId); - if (!isSyncWithPartition) { - res.add(mtmvRelatedTableIf.getName()); - } - } else { - if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) { - res.add(table.getName()); - } - } - } - return res; - } - - /** - * Determine which partition of mtmv can be rewritten - * - * @param mtmv - * @param ctx - * @return - */ - public static Collection getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) { - List res = Lists.newArrayList(); - Collection allPartitions = mtmv.getPartitions(); - // check session variable if enable rewrite - if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) { - return res; - } - if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable() - .isMaterializedViewRewriteEnableContainExternalTable()) { - return res; - } - - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return res; - } - // check mv is normal - if (!(mtmv.getStatus().getState() == MTMVState.NORMAL - && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { - return res; - } - // check gracePeriod - long gracePeriodMills = mtmv.getGracePeriod(); - long currentTimeMills = System.currentTimeMillis(); - for (Partition partition : allPartitions) { - if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime() - + gracePeriodMills)) { - res.add(partition); - continue; - } - try { - if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) { - res.add(partition); - } - } catch (AnalysisException e) { - // ignore it - LOG.warn("check isMTMVPartitionSync failed", e); - } - } - return res; - } - - /** - * Get the partitions that need to be refreshed - * - * @param mtmv - * @param baseTables - * @return - */ - public static List getMTMVNeedRefreshPartitions(MTMV mtmv, Set baseTables) { - Collection allPartitions = mtmv.getPartitions(); - List res = Lists.newArrayList(); - for (Partition partition : allPartitions) { - try { - if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables, - mtmv.getExcludedTriggerTables())) { - res.add(partition.getId()); - } - } catch (AnalysisException e) { - res.add(partition.getId()); - LOG.warn("check isMTMVPartitionSync failed", e); - } - } - return res; - } - - /** - * compare last update time of mtmvPartition and tablePartition - * - * @param mtmv - * @param mtmvPartitionId - * @param relatedTable - * @param relatedPartitionId - * @return - * @throws AnalysisException - */ - private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, - MTMVRelatedTableIf relatedTable, - Long relatedPartitionId) throws AnalysisException { - MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - return mtmv.getRefreshSnapshot() - .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot); - } - - /** - * like p_00000101_20170201 - * - * @param desc - * @return - */ - private static String generatePartitionName(PartitionKeyDesc desc) { - String partitionName = "p_"; - partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "") - .replaceAll("\\(|\\)|\\,|\\[|\\]", "_"); - if (partitionName.length() > 50) { - partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName)) - + "_" + System.currentTimeMillis(); - } - return partitionName; - } - - /** - * drop partition of mtmv - * - * @param mtmv - * @param partitionId - */ - private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { - if (!mtmv.writeLockIfExist()) { - return; - } - try { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); - DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); - Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); - } finally { - mtmv.writeUnlock(); - } - - } - - /** - * add partition for mtmv like relatedPartitionId of relatedTable - * - * @param mtmv - * @param partitionItem - * @throws DdlException - */ - private static void addPartition(MTMV mtmv, PartitionItem partitionItem) - throws DdlException { - PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc(); - Map partitionProperties = Maps.newHashMap(); - SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, - generatePartitionName(oldPartitionKeyDesc), - oldPartitionKeyDesc, partitionProperties); - - AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, - mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); - Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); - } - - /** - * compare PartitionItem and return equals partitionId - * if not found, return -1L - * - * @param target - * @param sources - * @return - */ - private static long getExistPartitionId(PartitionItem target, Map sources) { - for (Entry entry : sources.entrySet()) { - if (target.equals(entry.getValue())) { - return entry.getKey(); - } - } - return -1L; - } - - /** - * Determine is sync, ignoring excludedTriggerTables and non OlapTanle - * - * @param mtmvPartitionId - * @param tables - * @param excludedTriggerTables - * @return - */ - private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set tables, - Set excludedTriggerTables) throws AnalysisException { - for (BaseTableInfo baseTableInfo : tables) { - TableIf table = null; - try { - table = getTable(baseTableInfo); - } catch (AnalysisException e) { - LOG.warn("get table failed, {}", baseTableInfo, e); - return false; - } - if (excludedTriggerTables.contains(table.getName())) { - continue; - } - boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo); - if (!syncWithBaseTable) { - return false; - } - } - return true; - } - - private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo) - throws AnalysisException { - TableIf table = null; - try { - table = getTable(baseTableInfo); - } catch (AnalysisException e) { - LOG.warn("get table failed, {}", baseTableInfo, e); - return false; - } - - if (!(table instanceof MTMVRelatedTableIf)) { - // if not MTMVRelatedTableIf, we can not get snapshot from it, - // Currently, it is believed to be synchronous - return true; - } - MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table; - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - return mtmv.getRefreshSnapshot() - .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); - } - - private static boolean mtmvContainsExternalTable(MTMV mtmv) { + public static boolean mtmvContainsExternalTable(MTMV mtmv) { Set baseTables = mtmv.getRelation().getBaseTables(); for (BaseTableInfo baseTableInfo : baseTables) { if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) { @@ -448,60 +66,4 @@ public class MTMVUtil { } return false; } - - public static Map generatePartitionSnapshots(MTMV mtmv, - Set baseTables, Set partitionIds) - throws AnalysisException { - Map res = Maps.newHashMap(); - for (Long partitionId : partitionIds) { - res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId)); - } - return res; - } - - - private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, - Set baseTables, Long partitionId) - throws AnalysisException { - MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - List relatedPartitionIds = getMTMVPartitionRelatedPartitions( - mtmv.getPartitionItems().get(partitionId), - relatedTable); - - for (Long relatedPartitionId : relatedPartitionIds) { - MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - refreshPartitionSnapshot.getPartitions() - .put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot); - } - } - for (BaseTableInfo baseTableInfo : baseTables) { - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv - .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - continue; - } - TableIf table = getTable(baseTableInfo); - if (!(table instanceof MTMVRelatedTableIf)) { - continue; - } - refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); - } - return refreshPartitionSnapshot; - } - - private static List getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem, - MTMVRelatedTableIf relatedTable) { - List res = Lists.newArrayList(); - Map relatedPartitionItems = relatedTable.getPartitionItems(); - for (Entry entry : relatedPartitionItems.entrySet()) { - if (mtmvPartitionItem.equals(entry.getValue())) { - res.add(entry.getKey()); - // current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table - return res; - } - } - return res; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index bac4059c16..12f409a594 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -25,7 +25,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVRewriteUtil; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.memo.GroupExpression; @@ -315,8 +315,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac return ImmutableSet.of(); } // get mv valid partitions - Set mvDataValidPartitionIdSet = MTMVUtil.getMTMVCanRewritePartitions(mtmv, - cascadesContext.getConnectContext()).stream() + Set mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, + cascadesContext.getConnectContext(), System.currentTimeMillis()).stream() .map(Partition::getId) .collect(Collectors.toSet()); Set queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index e7b11b5fd1..baf72cc278 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -255,7 +255,7 @@ public class MaterializedViewUtils { Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get(); if (partitionColumnSet.contains(mvReferenceColumn)) { context.addTableColumn(table, mvReferenceColumn); - context.setPctPossible(true); + context.setPctPossible(!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull()); } return visit(relation, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java index f280e86781..5598c81259 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.util.Utils; @@ -67,7 +67,7 @@ public class RefreshMTMVInfo { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); if (!CollectionUtils.isEmpty(partitions)) { - MTMVUtil.getPartitionsIdsByNames(mtmv, partitions); + MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, partitions); } } catch (org.apache.doris.common.AnalysisException | MetaNotFoundException | DdlException e) { throw new AnalysisException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 2150558649..2e0de09d29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -33,7 +33,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.qe.ConnectContext; @@ -634,7 +634,7 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString())); trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString())); trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString())); - trow.addToColumnValue(new TCell().setBoolVal(MTMVUtil.isMTMVSync(mv))); + trow.addToColumnValue(new TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv))); dataBatch.add(trow); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java new file mode 100644 index 0000000000..4bb74bfd44 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +public class MTMVPartitionUtilTest { + @Mocked + private MTMV mtmv; + @Mocked + private Partition p1; + @Mocked + private MTMVRelation relation; + @Mocked + private BaseTableInfo baseTableInfo; + @Mocked + private MTMVPartitionInfo mtmvPartitionInfo; + @Mocked + private OlapTable baseOlapTable; + @Mocked + private MTMVSnapshotIf baseSnapshotIf; + @Mocked + private MTMVRefreshSnapshot refreshSnapshot; + @Mocked + private MTMVUtil mtmvUtil; + + private Set baseTables = Sets.newHashSet(); + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + baseTables.add(baseTableInfo); + new Expectations() { + { + mtmv.getRelation(); + minTimes = 0; + result = relation; + + mtmv.getPartitions(); + minTimes = 0; + result = Lists.newArrayList(p1); + + p1.getId(); + minTimes = 0; + result = 1L; + + mtmv.getMvPartitionInfo(); + minTimes = 0; + result = mtmvPartitionInfo; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.SELF_MANAGE; + + mtmvUtil.getTable(baseTableInfo); + minTimes = 0; + result = baseOlapTable; + + baseOlapTable.needAutoRefresh(); + minTimes = 0; + result = true; + + baseOlapTable.getTableSnapshot(); + minTimes = 0; + result = baseSnapshotIf; + + mtmv.getPartitionName(anyLong); + minTimes = 0; + result = "p1"; + + mtmv.getRefreshSnapshot(); + minTimes = 0; + result = refreshSnapshot; + + refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + minTimes = 0; + result = true; + + relation.getBaseTables(); + minTimes = 0; + result = baseTables; + + baseOlapTable.needAutoRefresh(); + minTimes = 0; + result = true; + + baseOlapTable.getPartitionSnapshot(anyLong); + minTimes = 0; + result = baseSnapshotIf; + + baseOlapTable.getPartitionName(anyLong); + minTimes = 0; + result = "p1"; + + refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any); + minTimes = 0; + result = true; + } + }; + } + + @Test + public void testIsMTMVSyncNormal() { + boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv); + Assert.assertTrue(mtmvSync); + } + + @Test + public void testIsMTMVSyncNotSync() { + new Expectations() { + { + refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + minTimes = 0; + result = false; + } + }; + boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv); + Assert.assertFalse(mtmvSync); + } + + @Test + public void testIsSyncWithPartition() throws AnalysisException { + boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + Assert.assertTrue(isSyncWithPartition); + } + + @Test + public void testIsSyncWithPartitionNotSync() throws AnalysisException { + new Expectations() { + { + refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any); + minTimes = 0; + result = false; + } + }; + boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L); + Assert.assertFalse(isSyncWithPartition); + } + + @Test + public void testGeneratePartitionName() { + List> inValues = Lists.newArrayList(); + inValues.add(Lists.newArrayList(new PartitionValue("value11"), new PartitionValue("value12"))); + inValues.add(Lists.newArrayList(new PartitionValue("value21"), new PartitionValue("value22"))); + PartitionKeyDesc inDesc = PartitionKeyDesc.createIn(inValues); + String inName = MTMVPartitionUtil.generatePartitionName(inDesc); + Assert.assertEquals("p_value11_value12_value21_value22", inName); + + PartitionKeyDesc rangeDesc = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue(1L)), + Lists.newArrayList(new PartitionValue(2L)) + ); + String rangeName = MTMVPartitionUtil.generatePartitionName(rangeDesc); + Assert.assertEquals("p_1_2", rangeName); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java new file mode 100644 index 0000000000..42b5b78384 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class MTMVRefreshSnapshotTest { + private String mvExistPartitionName = "mvp1"; + private String relatedExistPartitionName = "p1"; + private long baseExistTableId = 1L; + private long correctVersion = 1L; + private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot(); + private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion); + private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion); + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + Map partitionSnapshots = Maps.newHashMap(); + MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new MTMVRefreshPartitionSnapshot(); + partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot); + mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, p1Snapshot); + mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot); + refreshSnapshot.updateSnapshots(partitionSnapshots, Sets.newHashSet(mvExistPartitionName)); + } + + @Test + public void testPartitionSync() { + // normal + boolean sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertTrue(sync); + // non exist mv partition + sync = refreshSnapshot.equalsWithRelatedPartition("mvp2", relatedExistPartitionName, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // non exist related partition + sync = refreshSnapshot + .equalsWithRelatedPartition(mvExistPartitionName, "p2", new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // snapshot value not equal + sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVVersionSnapshot(2L)); + Assert.assertFalse(sync); + // snapshot type not equal + sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName, + new MTMVTimestampSnapshot(correctVersion)); + Assert.assertFalse(sync); + } + + @Test + public void testTableSync() { + // normal + boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + new MTMVVersionSnapshot(correctVersion)); + Assert.assertTrue(sync); + // non exist mv partition + sync = refreshSnapshot + .equalsWithBaseTable("mvp2", baseExistTableId, new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // non exist related partition + sync = refreshSnapshot + .equalsWithBaseTable(mvExistPartitionName, 2L, new MTMVVersionSnapshot(correctVersion)); + Assert.assertFalse(sync); + // snapshot value not equal + sync = refreshSnapshot + .equalsWithBaseTable(mvExistPartitionName, baseExistTableId, new MTMVVersionSnapshot(2L)); + Assert.assertFalse(sync); + // snapshot type not equal + sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + new MTMVTimestampSnapshot(correctVersion)); + Assert.assertFalse(sync); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java new file mode 100644 index 0000000000..55394897e4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; + +public class MTMVRewriteUtilTest { + @Mocked + private MTMV mtmv; + @Mocked + private ConnectContext ctx; + @Mocked + private SessionVariable sessionVariable; + @Mocked + private Partition p1; + @Mocked + private MTMVRelation relation; + @Mocked + private MTMVStatus status; + @Mocked + private MTMVPartitionUtil mtmvPartitionUtil; + @Mocked + private MTMVUtil mtmvUtil; + private long currentTimeMills = 3L; + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + + new Expectations() { + { + mtmv.getPartitions(); + minTimes = 0; + result = Lists.newArrayList(p1); + + p1.getVisibleVersionTime(); + minTimes = 0; + result = 1L; + + mtmv.getGracePeriod(); + minTimes = 0; + result = 0L; + + mtmv.getRelation(); + minTimes = 0; + result = relation; + + mtmv.getStatus(); + minTimes = 0; + result = status; + + mtmv.getGracePeriod(); + minTimes = 0; + result = 0L; + + status.getState(); + minTimes = 0; + result = MTMVState.NORMAL; + + status.getRefreshState(); + minTimes = 0; + result = MTMVRefreshState.SUCCESS; + + ctx.getSessionVariable(); + minTimes = 0; + result = sessionVariable; + + sessionVariable.isEnableMaterializedViewRewrite(); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = true; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set) any, (Set) any); + minTimes = 0; + result = true; + + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = false; + } + }; + } + + @Test + public void testGetMTMVCanRewritePartitionsNormal() { + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsInGracePeriod() throws AnalysisException { + new Expectations() { + { + mtmv.getGracePeriod(); + minTimes = 0; + result = 2L; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set) any, (Set) any); + minTimes = 0; + result = false; + } + }; + + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws AnalysisException { + new Expectations() { + { + mtmv.getGracePeriod(); + minTimes = 0; + result = 1L; + + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set) any, (Set) any); + minTimes = 0; + result = false; + } + }; + + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() { + new Expectations() { + { + sessionVariable.isEnableMaterializedViewRewrite(); + minTimes = 0; + result = false; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { + new Expectations() { + { + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set) any, (Set) any); + minTimes = 0; + result = false; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsEnableContainExternalTable() { + new Expectations() { + { + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = true; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(1, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsDisableContainExternalTable() { + new Expectations() { + { + MTMVUtil.mtmvContainsExternalTable((MTMV) any); + minTimes = 0; + result = true; + + sessionVariable.isMaterializedViewRewriteEnableContainExternalTable(); + minTimes = 0; + result = false; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsStateAbnormal() { + new Expectations() { + { + status.getState(); + minTimes = 0; + result = MTMVState.SCHEMA_CHANGE; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + + @Test + public void testGetMTMVCanRewritePartitionsRefreshStateAbnormal() { + new Expectations() { + { + status.getRefreshState(); + minTimes = 0; + result = MTMVRefreshState.FAIL; + } + }; + Collection mtmvCanRewritePartitions = MTMVRewriteUtil + .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills); + Assert.assertEquals(0, mtmvCanRewritePartitions.size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java new file mode 100644 index 0000000000..b1fc52dad4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; +import org.apache.doris.job.extensions.mtmv.MTMVTaskContext; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.apache.commons.collections.CollectionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +public class MTMVTaskTest { + private long poneId = 1L; + private String poneName = "p1"; + private long ptwoId = 2L; + private String ptwoName = "p2"; + private List allPartitionIds = Lists.newArrayList(poneId, ptwoId); + private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(), Sets.newHashSet()); + + @Mocked + private MTMV mtmv; + @Mocked + private MTMVUtil mtmvUtil; + @Mocked + private MTMVPartitionUtil mtmvPartitionUtil; + @Mocked + private MTMVPartitionInfo mtmvPartitionInfo; + @Mocked + private MTMVRefreshInfo mtmvRefreshInfo; + + @Before + public void setUp() + throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { + + new Expectations() { + { + mtmvUtil.getMTMV(anyLong, anyLong); + minTimes = 0; + result = mtmv; + + mtmv.getPartitionIds(); + minTimes = 0; + result = allPartitionIds; + + mtmv.getMvPartitionInfo(); + minTimes = 0; + result = mtmvPartitionInfo; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.FOLLOW_BASE_TABLE; + + mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, Lists.newArrayList(poneName)); + minTimes = 0; + result = poneId; + + mtmvPartitionUtil.isMTMVSync(mtmv, (Set) any, (Set) any); + minTimes = 0; + result = true; + + mtmv.getRefreshInfo(); + minTimes = 0; + result = mtmvRefreshInfo; + + mtmvRefreshInfo.getRefreshMethod(); + minTimes = 0; + result = RefreshMethod.COMPLETE; + } + }; + } + + @Test + public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(allPartitionIds, result); + } + + @Test + public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(Lists.newArrayList(poneId), result); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException { + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List result = task.calculateNeedRefreshPartitions(); + Assert.assertTrue(CollectionUtils.isEmpty(result)); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException { + new Expectations() { + { + mtmvPartitionUtil.isMTMVSync(mtmv, (Set) any, (Set) any); + minTimes = 0; + result = false; + } + }; + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(allPartitionIds, result); + } + + @Test + public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws AnalysisException { + new Expectations() { + { + mtmvPartitionUtil.isMTMVSync(mtmv, (Set) any, (Set) any); + minTimes = 0; + result = false; + + mtmvRefreshInfo.getRefreshMethod(); + minTimes = 0; + result = RefreshMethod.AUTO; + + mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, (Set) any); + minTimes = 0; + result = Lists.newArrayList(ptwoId); + } + }; + MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + MTMVTask task = new MTMVTask(mtmv, relation, context); + List result = task.calculateNeedRefreshPartitions(); + Assert.assertEquals(Lists.newArrayList(ptwoId), result); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java index 4204bbe022..02fb18edbf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtilsTest.java @@ -26,7 +26,6 @@ import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Optional; @@ -252,7 +251,6 @@ public class MaterializedViewUtilsTest extends TestWithFeService { } @Test - @Disabled public void getRelatedTableInfoTestWithoutGroupNullTest() { PlanChecker.from(connectContext) .checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias, l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, " diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out b/regression-test/data/mtmv_p0/test_hive_mtmv.out index 9ee89dd033..26e34af7b5 100644 --- a/regression-test/data/mtmv_p0/test_hive_mtmv.out +++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out @@ -8,6 +8,11 @@ 1 A 20230101 2 B 20230101 3 C 20230101 + +-- !refresh_complete -- +1 A 20230101 +2 B 20230101 +3 C 20230101 4 D 20230102 5 E 20230102 6 F 20230102 diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy index 573f1f84d5..cf34cfb616 100644 --- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy @@ -57,12 +57,20 @@ suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive") order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by id" //refresh other partitions + // current, for hive, auto refresh will not change data sql """ REFRESH MATERIALIZED VIEW ${mvName} """ waitingMTMVTaskFinished(jobName) order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by id" + //refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_refresh_complete "SELECT * FROM ${mvName} order by id" + sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalog_name}"""