[Fix](nereids) Fix partition check failure (#29642)

Optimize mv rewrite partition check logic and fix check failure and
add more relevant explain info.
This commit is contained in:
seawinde
2024-01-09 11:55:44 +08:00
committed by yiguolei
parent d50c8b6d3a
commit 28695249ea

View File

@ -18,13 +18,10 @@
package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
@ -55,9 +52,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
@ -65,7 +61,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -75,8 +71,6 @@ import java.util.stream.Collectors;
public abstract class AbstractMaterializedViewRule implements ExplorationRuleFactory {
public static final HashSet<JoinType> SUPPORTED_JOIN_TYPE_SET = Sets.newHashSet(JoinType.INNER_JOIN,
JoinType.LEFT_OUTER_JOIN);
protected final String currentClassName = this.getClass().getSimpleName();
private final Logger logger = LogManager.getLogger(this.getClass());
/**
* The abstract template method for query rewrite, it contains the main logic and different query
@ -195,12 +189,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
if (rewrittenPlan == null) {
continue;
}
if (!checkPartitionIsValid(queryStructInfo, materializationContext, cascadesContext)) {
materializationContext.recordFailReason(queryStructInfo.getOriginalPlanId(),
Pair.of("Check partition validation fail",
"the partition used by query is invalid in materialized view"));
continue;
}
// checkout the output logical properties is the same with query
if (!checkOutput(queryPlan, rewrittenPlan, materializationContext)) {
continue;
}
@ -210,6 +199,20 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
cascadesContext.getCurrentJobContext().getRequiredProperties());
Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute();
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
// check the partitions used by rewritten plan is valid or not
Set<Long> invalidPartitionsQueryUsed =
calcInvalidPartitions(rewrittenPlan, materializationContext, cascadesContext);
if (!invalidPartitionsQueryUsed.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo.getOriginalPlanId(),
Pair.of("Check partition query used validation fail",
String.format("the partition used by query is invalid by materialized view,"
+ "invalid partition info query used is %s",
materializationContext.getMTMV().getPartitions().stream()
.filter(partition ->
invalidPartitionsQueryUsed.contains(partition.getId()))
.collect(Collectors.toSet()))));
continue;
}
materializationContext.setSuccess(true);
recordIfRewritten(queryPlan, materializationContext);
rewriteResults.add(rewrittenPlan);
@ -239,62 +242,34 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
* Maybe only just some partitions is valid in materialized view, so we should check if the mv can
* offer the partitions which query used or not.
*/
protected boolean checkPartitionIsValid(StructInfo queryInfo, MaterializationContext materializationContext,
protected Set<Long> calcInvalidPartitions(Plan rewrittenPlan, MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// check partition is valid or not
MTMV mtmv = materializationContext.getMTMV();
PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
// if not partition, if rewrite success, it means mv is available
return true;
return ImmutableSet.of();
}
// check mv related table partition is valid or not
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTable();
if (relatedPartitionTable == null) {
return true;
}
Optional<LogicalOlapScan> relatedTableRelation = queryInfo.getRelations().stream()
.filter(LogicalOlapScan.class::isInstance)
.filter(relation -> relatedPartitionTable.equals(new BaseTableInfo(relation.getTable())))
.map(LogicalOlapScan.class::cast).findFirst();
if (!relatedTableRelation.isPresent()) {
logger.warn("mv is partition update, but related table relation is null");
return false;
}
OlapTable relatedTable = relatedTableRelation.get().getTable();
Map<Long, Set<Long>> mvToBasePartitionMap;
try {
mvToBasePartitionMap = MTMVUtil.getMvToBasePartitions(mtmv, relatedTable);
} catch (AnalysisException e) {
logger.warn("mvRewriteSuccess getMvToBasePartitions fail", e);
return false;
return ImmutableSet.of();
}
// get mv valid partitions
Collection<Partition> mvDataValidPartitions = MTMVUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext());
Map<Long, PartitionItem> allPartitions = mvPartitionInfo.getAllPartitions();
if (!allPartitions.isEmpty() && mvDataValidPartitions.isEmpty()) {
// do not have valid partition
return false;
}
// get mv related table valid partitions
Set<Long> relatedTalbeValidSet = mvDataValidPartitions.stream().map(partition -> {
Set<Long> relatedBaseTablePartitions = mvToBasePartitionMap.get(partition.getId());
if (relatedBaseTablePartitions == null || relatedBaseTablePartitions.isEmpty()) {
return ImmutableList.of();
} else {
return relatedBaseTablePartitions;
}
}).flatMap(Collection::stream).map(Long.class::cast).collect(Collectors.toSet());
// get query selected partitions to make the partitions is valid or not
Set<Long> relatedTableSelectedPartitionToCheck = new HashSet<>(
relatedTableRelation.get().getSelectedPartitionIds());
if (relatedTableSelectedPartitionToCheck.isEmpty()) {
relatedTableSelectedPartitionToCheck.addAll(relatedTable.getPartitionIds());
}
return !relatedTalbeValidSet.isEmpty() && relatedTalbeValidSet.containsAll(
relatedTableSelectedPartitionToCheck);
Set<Long> mvDataValidPartitionIdSet = MTMVUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext()).stream()
.map(Partition::getId)
.collect(Collectors.toSet());
Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
&& Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()))
.stream()
.map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
queryUsedPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
return queryUsedPartitionIdSet;
}
/**