diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 10739be853..05622bfc0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -112,7 +112,12 @@ public class MTMV extends OlapTable { } public MTMVRefreshInfo getRefreshInfo() { - return refreshInfo; + readMvLock(); + try { + return refreshInfo; + } finally { + readMvUnlock(); + } } public String getQuerySql() { @@ -120,8 +125,8 @@ public class MTMV extends OlapTable { } public MTMVStatus getStatus() { + readMvLock(); try { - readMvLock(); return status; } finally { readMvUnlock(); @@ -133,11 +138,21 @@ public class MTMV extends OlapTable { } public MTMVJobInfo getJobInfo() { - return jobInfo; + readMvLock(); + try { + return jobInfo; + } finally { + readMvUnlock(); + } } public MTMVRelation getRelation() { - return relation; + readMvLock(); + try { + return relation; + } finally { + readMvUnlock(); + } } public void setCache(MTMVCache cache) { @@ -145,12 +160,17 @@ public class MTMV extends OlapTable { } public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) { - return refreshInfo.updateNotNull(newRefreshInfo); + writeMvLock(); + try { + return refreshInfo.updateNotNull(newRefreshInfo); + } finally { + writeMvUnlock(); + } } public MTMVStatus alterStatus(MTMVStatus newStatus) { + writeMvLock(); try { - writeMvLock(); return this.status.updateNotNull(newStatus); } finally { writeMvUnlock(); @@ -159,8 +179,8 @@ public class MTMV extends OlapTable { public void addTaskResult(MTMVTask task, MTMVRelation relation, Map partitionSnapshots) { + writeMvLock(); try { - writeMvLock(); if (task.getStatus() == TaskStatus.SUCCESS) { this.status.setState(MTMVState.NORMAL); this.status.setSchemaChangeDetail(null); @@ -185,41 +205,66 @@ public class MTMV extends OlapTable { } public Map alterMvProperties(Map mvProperties) { - this.mvProperties.putAll(mvProperties); - return this.mvProperties; + writeMvLock(); + try { + this.mvProperties.putAll(mvProperties); + return this.mvProperties; + } finally { + writeMvUnlock(); + } } public long getGracePeriod() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { - return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; - } else { - return 0L; + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { + return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; + } else { + return 0L; + } + } finally { + readMvUnlock(); } } public Optional getWorkloadGroup() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils - .isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) { - return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)); + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils + .isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) { + return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)); + } + return Optional.empty(); + } finally { + readMvUnlock(); } - return Optional.empty(); } public int getRefreshPartitionNum() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { - int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); - return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; - } else { - return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { + int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); + return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; + } else { + return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; + } + } finally { + readMvUnlock(); } } public Set getExcludedTriggerTables() { - if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { - return Sets.newHashSet(); + readMvLock(); + try { + if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { + return Sets.newHashSet(); + } + String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); + return Sets.newHashSet(split); + } finally { + readMvUnlock(); } - String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); - return Sets.newHashSet(split); } public MTMVCache getOrGenerateCache() throws AnalysisException { @@ -237,7 +282,12 @@ public class MTMV extends OlapTable { } public Map getMvProperties() { - return mvProperties; + readMvLock(); + try { + return mvProperties; + } finally { + readMvUnlock(); + } } public MTMVPartitionInfo getMvPartitionInfo() { @@ -254,7 +304,7 @@ public class MTMV extends OlapTable { * @return mvPartitionId ==> mvPartitionKeyDesc */ public Map generateMvPartitionDescs() { - Map mtmvItems = getPartitionItems(); + Map mtmvItems = getAndCopyPartitionItems(); Map result = Maps.newHashMap(); for (Entry entry : mtmvItems.entrySet()) { result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); @@ -276,7 +326,7 @@ public class MTMV extends OlapTable { return Maps.newHashMap(); } Map> res = new HashMap<>(); - Map relatedPartitionItems = mvPartitionInfo.getRelatedTable().getPartitionItems(); + Map relatedPartitionItems = mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(); int relatedColPos = mvPartitionInfo.getRelatedColPos(); for (Entry entry : relatedPartitionItems.entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); @@ -303,7 +353,7 @@ public class MTMV extends OlapTable { } Map> res = Maps.newHashMap(); Map> relatedPartitionDescs = generateRelatedPartitionDescs(); - Map mvPartitionItems = getPartitionInfo().getIdToItem(false); + Map mvPartitionItems = getAndCopyPartitionItems(); for (Entry entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index df34f4333a..2acdf00a49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1053,7 +1053,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } public List getPartitionIds() { - return new ArrayList<>(idToPartition.keySet()); + readLock(); + try { + return new ArrayList<>(idToPartition.keySet()); + } finally { + readUnlock(); + } } public Set getCopiedBfColumns() { @@ -2572,8 +2577,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public Map getPartitionItems() { - return getPartitionInfo().getIdToItem(false); + public Map getAndCopyPartitionItems() { + readLock(); + try { + return Maps.newHashMap(getPartitionInfo().getIdToItem(false)); + } finally { + readUnlock(); + } } @Override @@ -2595,7 +2605,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { @Override public String getPartitionName(long partitionId) throws AnalysisException { - return getPartitionOrAnalysisException(partitionId).getName(); + readLock(); + try { + return getPartitionOrAnalysisException(partitionId).getName(); + } finally { + readUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index d095a959e9..47b684c264 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -773,7 +773,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public Map getPartitionItems() { + public Map getAndCopyPartitionItems() { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -816,7 +816,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI long partitionId = 0L; long maxVersionTime = 0L; long visibleVersionTime; - for (Entry entry : getPartitionItems().entrySet()) { + for (Entry entry : getAndCopyPartitionItems().entrySet()) { visibleVersionTime = getPartitionLastModifyTime(entry.getKey()); if (visibleVersionTime > maxVersionTime) { maxVersionTime = visibleVersionTime; @@ -831,7 +831,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } private HivePartition getPartitionById(long partitionId) throws AnalysisException { - PartitionItem item = getPartitionItems().get(partitionId); + PartitionItem item = getAndCopyPartitionItems().get(partitionId); List> partitionValuesList = transferPartitionItemToPartitionValues(item); List partitions = getPartitionsByPartitionValues(partitionValuesList); if (partitions.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 88fe02a8b4..9c85725b5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -141,7 +141,7 @@ public class MTMVPartitionUtil { throws AnalysisException { int pos = getPos(relatedTable, relatedCol); Set res = Sets.newHashSet(); - for (Entry entry : relatedTable.getPartitionItems().entrySet()) { + for (Entry entry : relatedTable.getAndCopyPartitionItems().entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos); res.add(partitionKeyDesc); } @@ -163,18 +163,24 @@ public class MTMVPartitionUtil { public static List getPartitionNamesByIds(MTMV mtmv, Collection ids) throws AnalysisException { List res = Lists.newArrayList(); for (Long partitionId : ids) { - res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); + res.add(mtmv.getPartitionName(partitionId)); } return res; } public static List getPartitionsIdsByNames(MTMV mtmv, List partitions) throws AnalysisException { - List res = Lists.newArrayList(); - for (String partitionName : partitions) { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); - res.add(partition.getId()); + mtmv.readLock(); + try { + List res = Lists.newArrayList(); + for (String partitionName : partitions) { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); + res.add(partition.getId()); + } + return res; + } finally { + mtmv.readUnlock(); } - return res; + } /** @@ -209,9 +215,9 @@ public class MTMVPartitionUtil { public static boolean isMTMVSync(MTMV mtmv, Set tables, Set excludeTables, Map> partitionMappings) throws AnalysisException { - Collection partitions = mtmv.getPartitions(); - for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), tables, + List partitionIds = mtmv.getPartitionIds(); + for (Long partitionId : partitionIds) { + if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), tables, excludeTables)) { return false; } @@ -277,16 +283,16 @@ public class MTMVPartitionUtil { */ public static List getMTMVNeedRefreshPartitions(MTMV mtmv, Set baseTables, Map> partitionMappings) { - Collection allPartitions = mtmv.getPartitions(); + List partitionIds = mtmv.getPartitionIds(); List res = Lists.newArrayList(); - for (Partition partition : allPartitions) { + for (Long partitionId : partitionIds) { try { - if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), baseTables, + if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), baseTables, mtmv.getExcludedTriggerTables())) { - res.add(partition.getId()); + res.add(partitionId); } } catch (AnalysisException e) { - res.add(partition.getId()); + res.add(partitionId); LOG.warn("check isMTMVPartitionSync failed", e); } } @@ -361,6 +367,7 @@ public class MTMVPartitionUtil { /** * add partition for mtmv like relatedPartitionId of relatedTable + * `Env.getCurrentEnv().addPartition` has obtained the lock internally, but we do not obtain the lock here * * @param mtmv * @param oldPartitionKeyDesc @@ -448,7 +455,7 @@ public class MTMVPartitionUtil { throws AnalysisException { Map res = Maps.newHashMap(); for (Long partitionId : partitionIds) { - res.put(mtmv.getPartition(partitionId).getName(), + res.put(mtmv.getPartitionName(partitionId), generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId))); } return res; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 46454679b5..5ec7e98a40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf { * * @return partitionId->PartitionItem */ - Map getPartitionItems(); + Map getAndCopyPartitionItems(); /** * getPartitionType LIST/RANGE/UNPARTITIONED diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index df38ce1872..bf819553e8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -71,6 +71,10 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = Lists.newArrayList(p1); + mtmv.getPartitionIds(); + minTimes = 0; + result = Lists.newArrayList(1L); + p1.getId(); minTimes = 0; result = 1L;