[enhance](mtmv)Improve the performance of obtaining partition/table v… (#39478)

…ersions (#39301)
pick: https://github.com/apache/doris/pull/39301
This commit is contained in:
zhangdong
2024-08-22 00:07:52 +08:00
committed by GitHub
parent b878f7f1a6
commit 77270a0479
12 changed files with 272 additions and 89 deletions

View File

@ -47,6 +47,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
@ -2836,14 +2837,18 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion();
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}
@Override
public MTMVSnapshotIf getTableSnapshot() {
long visibleVersion = getVisibleVersion();
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}

View File

@ -20,9 +20,11 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.rpc.RpcException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -37,6 +39,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Internal representation of partition-related metadata.
@ -168,6 +171,16 @@ public class Partition extends MetaObject implements Writable {
return visibleVersionTime;
}
public static List<Long> getVisibleVersions(List<? extends Partition> partitions) throws RpcException {
if (Config.isCloudMode()) {
// Throwing RPC exceptions is to ensure compatibility with the caller's code
// and avoid different implementations in different versions
throw new RpcException("127.0.0.1", "not implement cloud in current version");
} else {
return partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList());
}
}
/**
* if visibleVersion is 1, do not return creation time but 0
*

View File

@ -32,6 +32,7 @@ import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
@ -749,13 +750,14 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionName);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
}
@Override
public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
@ -177,8 +178,8 @@ public class MTMVTask extends AbstractTask {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings();
this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings);
MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions = calculateNeedRefreshPartitions(context);
this.refreshMode = generateRefreshMode(needRefreshPartitions);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
@ -196,8 +197,7 @@ public class MTMVTask extends AbstractTask {
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames,
partitionMappings);
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
@ -432,7 +432,7 @@ public class MTMVTask extends AbstractTask {
}
}
public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> partitionMappings)
public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context)
throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
@ -450,9 +450,8 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables(),
partitionMappings);
boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}
@ -462,8 +461,7 @@ public class MTMVTask extends AbstractTask {
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(),
partitionMappings);
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevel());
}
public MTMVTaskContext getTaskContext() {

View File

@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import java.util.Map;
public class MTMVBaseVersions {
private final Map<Long, Long> tableVersions;
private final Map<String, Long> partitionVersions;
public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long> partitionVersions) {
this.tableVersions = tableVersions;
this.partitionVersions = partitionVersions;
}
public Map<Long, Long> getTableVersions() {
return tableVersions;
}
public Map<String, Long> getPartitionVersions() {
return partitionVersions;
}
}

View File

@ -26,13 +26,16 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.rpc.RpcException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -71,17 +74,18 @@ public class MTMVPartitionUtil {
/**
* Determine whether the partition is sync with retated partition and other baseTables
*
* @param mtmv
* @param refreshContext
* @param partitionName
* @param relatedPartitionNames
* @param tables
* @param excludedTriggerTables
* @return
* @throws AnalysisException
*/
public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set<String> relatedPartitionNames,
public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, String partitionName,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
MTMV mtmv = refreshContext.getMtmv();
Set<String> relatedPartitionNames = refreshContext.getPartitionMappings().get(partitionName);
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
@ -92,9 +96,10 @@ public class MTMVPartitionUtil {
partitionName, mtmv.getName(), relatedTable.getName());
return false;
}
isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, relatedTable, relatedPartitionNames);
isSyncWithPartition = isSyncWithPartitions(refreshContext, partitionName, relatedPartitionNames);
}
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionName, tables, excludedTriggerTables);
return isSyncWithPartition && isSyncWithAllBaseTables(refreshContext, partitionName, tables,
excludedTriggerTables);
}
@ -192,8 +197,8 @@ public class MTMVPartitionUtil {
return false;
}
try {
return isMTMVSync(mtmv, mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet(),
mtmv.calculatePartitionMappings());
return isMTMVSync(MTMVRefreshContext.buildContext(mtmv), mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@ -203,19 +208,18 @@ public class MTMVPartitionUtil {
/**
* Determine whether the mtmv is sync with tables
*
* @param mtmv
* @param context
* @param tables
* @param excludeTables
* @param partitionMappings
* @return
* @throws AnalysisException
*/
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables,
Map<String, Set<String>> partitionMappings)
public static boolean isMTMVSync(MTMVRefreshContext context, Set<BaseTableInfo> tables, Set<String> excludeTables)
throws AnalysisException {
MTMV mtmv = context.getMtmv();
Set<String> partitionNames = mtmv.getPartitionNames();
for (String partitionName : partitionNames) {
if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), tables,
if (!isMTMVPartitionSync(context, partitionName, tables,
excludeTables)) {
return false;
}
@ -234,17 +238,18 @@ public class MTMVPartitionUtil {
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds)
throws AnalysisException {
Map<Long, List<String>> res = Maps.newHashMap();
Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings();
MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
for (Long partitionId : partitionIds) {
String partitionName = mtmv.getPartitionOrAnalysisException(partitionId).getName();
res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName, partitionMappings.get(partitionName)));
res.put(partitionId, getPartitionUnSyncTables(context, partitionName));
}
return res;
}
private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partitionName,
Set<String> relatedPartitionNames)
private static List<String> getPartitionUnSyncTables(MTMVRefreshContext context, String partitionName)
throws AnalysisException {
MTMV mtmv = context.getMtmv();
Set<String> relatedPartitionNames = context.getPartitionMappings().get(partitionName);
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) {
TableIf table = MTMVUtil.getTable(baseTableInfo);
@ -262,13 +267,13 @@ public class MTMVPartitionUtil {
res.add(mtmvRelatedTableIf.getName());
continue;
}
boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, mtmvRelatedTableIf,
boolean isSyncWithPartition = isSyncWithPartitions(context, partitionName,
relatedPartitionNames);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
} else {
if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) {
if (!isSyncWithBaseTable(context, partitionName, baseTableInfo)) {
res.add(table.getName());
}
}
@ -279,17 +284,17 @@ public class MTMVPartitionUtil {
/**
* Get the partitions that need to be refreshed
*
* @param mtmv
* @param context
* @param baseTables
* @return
*/
public static List<String> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables,
Map<String, Set<String>> partitionMappings) {
public static List<String> getMTMVNeedRefreshPartitions(MTMVRefreshContext context, Set<BaseTableInfo> baseTables) {
MTMV mtmv = context.getMtmv();
Set<String> partitionNames = mtmv.getPartitionNames();
List<String> res = Lists.newArrayList();
for (String partitionName : partitionNames) {
try {
if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), baseTables,
if (!isMTMVPartitionSync(context, partitionName, baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partitionName);
}
@ -304,16 +309,16 @@ public class MTMVPartitionUtil {
/**
* Compare the current and last updated partition (or table) snapshot of the associated partition (or table)
*
* @param mtmv
* @param context
* @param mtmvPartitionName
* @param relatedTable
* @param relatedPartitionNames
* @return
* @throws AnalysisException
*/
public static boolean isSyncWithPartitions(MTMV mtmv, String mtmvPartitionName,
MTMVRelatedTableIf relatedTable,
public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mtmvPartitionName,
Set<String> relatedPartitionNames) throws AnalysisException {
MTMV mtmv = context.getMtmv();
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
if (!relatedTable.needAutoRefresh()) {
return true;
}
@ -324,7 +329,7 @@ public class MTMVPartitionUtil {
}
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName);
.getPartitionSnapshot(relatedPartitionName, context);
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
@ -397,7 +402,8 @@ public class MTMVPartitionUtil {
* @param excludedTriggerTables
* @return
*/
private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionName, Set<BaseTableInfo> tables,
private static boolean isSyncWithAllBaseTables(MTMVRefreshContext context, String mtmvPartitionName,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
@ -410,7 +416,7 @@ public class MTMVPartitionUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionName, baseTableInfo);
boolean syncWithBaseTable = isSyncWithBaseTable(context, mtmvPartitionName, baseTableInfo);
if (!syncWithBaseTable) {
return false;
}
@ -418,8 +424,10 @@ public class MTMVPartitionUtil {
return true;
}
private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, BaseTableInfo baseTableInfo)
private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mtmvPartitionName,
BaseTableInfo baseTableInfo)
throws AnalysisException {
MTMV mtmv = context.getMtmv();
TableIf table = null;
try {
table = MTMVUtil.getTable(baseTableInfo);
@ -437,7 +445,7 @@ public class MTMVPartitionUtil {
if (!baseTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
}
@ -445,35 +453,35 @@ public class MTMVPartitionUtil {
/**
* Generate updated snapshots of partitions to determine if they are synchronized
*
* @param mtmv
* @param context
* @param baseTables
* @param partitionNames
* @param partitionMappings
* @return
* @throws AnalysisException
*/
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<String> partitionNames,
Map<String, Set<String>> partitionMappings)
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMVRefreshContext context,
Set<BaseTableInfo> baseTables, Set<String> partitionNames)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (String partitionName : partitionNames) {
res.put(partitionName,
generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionName)));
generatePartitionSnapshot(context, baseTables,
context.getPartitionMappings().get(partitionName)));
}
return res;
}
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefreshContext context,
Set<BaseTableInfo> baseTables, Set<String> relatedPartitionNames)
throws AnalysisException {
MTMV mtmv = context.getMtmv();
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName);
.getPartitionSnapshot(relatedPartitionName, context);
refreshPartitionSnapshot.getPartitions()
.put(relatedPartitionName, partitionSnapshot);
}
@ -487,7 +495,8 @@ public class MTMVPartitionUtil {
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot());
refreshPartitionSnapshot.getTables()
.put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot(context));
}
return refreshPartitionSnapshot;
}
@ -501,4 +510,57 @@ public class MTMVPartitionUtil {
}
throw new AnalysisException("can not getPartitionColumnType by:" + col);
}
public static MTMVBaseVersions getBaseVersions(MTMV mtmv) throws AnalysisException {
return new MTMVBaseVersions(getTableVersions(mtmv), getPartitionVersions(mtmv));
}
private static Map<String, Long> getPartitionVersions(MTMV mtmv) throws AnalysisException {
Map<String, Long> res = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType().equals(MTMVPartitionType.SELF_MANAGE)) {
return res;
}
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
if (!(relatedTable instanceof OlapTable)) {
return res;
}
List<Partition> partitions = Lists.newArrayList(((OlapTable) relatedTable).getPartitions());
List<Long> versions = null;
try {
versions = Partition.getVisibleVersions(partitions);
} catch (RpcException e) {
throw new AnalysisException("getVisibleVersions failed.", e);
}
Preconditions.checkState(partitions.size() == versions.size());
for (int i = 0; i < partitions.size(); i++) {
res.put(partitions.get(i).getName(), versions.get(i));
}
return res;
}
private static Map<Long, Long> getTableVersions(MTMV mtmv) {
Map<Long, Long> res = Maps.newHashMap();
if (mtmv.getRelation() == null || mtmv.getRelation().getBaseTablesOneLevel() == null) {
return res;
}
List<OlapTable> olapTables = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) {
TableIf table = null;
try {
table = MTMVUtil.getTable(baseTableInfo);
} catch (AnalysisException e) {
LOG.info(e);
continue;
}
if (table instanceof OlapTable) {
olapTables.add((OlapTable) table);
}
}
List<Long> versions = OlapTable.getVisibleVersionInBatch(olapTables);
Preconditions.checkState(olapTables.size() == versions.size());
for (int i = 0; i < olapTables.size(); i++) {
res.put(olapTables.get(i).getId(), versions.get(i));
}
return res;
}
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.AnalysisException;
import java.util.Map;
import java.util.Set;
public class MTMVRefreshContext {
private MTMV mtmv;
private Map<String, Set<String>> partitionMappings;
private MTMVBaseVersions baseVersions;
public MTMVRefreshContext(MTMV mtmv) {
this.mtmv = mtmv;
}
public MTMV getMtmv() {
return mtmv;
}
public Map<String, Set<String>> getPartitionMappings() {
return partitionMappings;
}
public MTMVBaseVersions getBaseVersions() {
return baseVersions;
}
public static MTMVRefreshContext buildContext(MTMV mtmv) throws AnalysisException {
MTMVRefreshContext context = new MTMVRefreshContext(mtmv);
context.partitionMappings = mtmv.calculatePartitionMappings();
context.baseVersions = MTMVPartitionUtil.getBaseVersions(mtmv);
return context;
}
}

View File

@ -69,7 +69,7 @@ public interface MTMVRelatedTableIf extends TableIf {
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException;
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException;
/**
* getTableSnapshot
@ -77,7 +77,7 @@ public interface MTMVRelatedTableIf extends TableIf {
* @return table snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getTableSnapshot() throws AnalysisException;
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException;
/**
* Does the current type of table allow timed triggering

View File

@ -31,8 +31,6 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MTMVRewriteUtil {
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
@ -57,7 +55,7 @@ public class MTMVRewriteUtil {
|| mtmv.getStatus().getRefreshState() == MTMVRefreshState.INIT) {
return res;
}
Map<String, Set<String>> partitionMappings = null;
MTMVRefreshContext refreshContext = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
@ -67,11 +65,11 @@ public class MTMVRewriteUtil {
continue;
}
try {
if (partitionMappings == null) {
partitionMappings = mtmv.calculatePartitionMappings();
if (refreshContext == null) {
refreshContext = MTMVRefreshContext.buildContext(mtmv);
}
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTablesOneLevel(),
if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(),
mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet())) {
res.add(partition);
}