[opt](mtmv) Improve the mv rewrite performance by optimize code usage (#35674)

Improve the performance from two points, one is optimize decide model
method and another is to reuse the mv struc info:

1. Instead of use java.util.List#containsAll by
java.util.Set#containsAll in method
AbstractMaterializedViewRule#decideMatchMode

2. Reuse the mv struct info in different query, because mv struct info
is immutable.

Notes: tableBitSet in struct info is relevant to the statementContext
in cascadesContext, if reuse the mv struct info for different query,
we should re generate table bitset and construct new struct info with
method StructInfo#withTableBitSet
This commit is contained in:
seawinde
2024-05-31 18:01:45 +08:00
committed by yiguolei
parent 48d4601ee3
commit 5315df36c0
6 changed files with 106 additions and 58 deletions

View File

@ -25,7 +25,9 @@ import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
import org.apache.doris.nereids.rules.rewrite.EliminateSort;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
@ -38,6 +40,9 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import java.util.BitSet;
import java.util.Optional;
/**
* The cache for materialized view cache
*/
@ -49,11 +54,13 @@ public class MTMVCache {
// The original plan of mv def sql
private final Plan originalPlan;
private final Statistics statistics;
private final StructInfo structInfo;
public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics) {
public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics, StructInfo structInfo) {
this.logicalPlan = logicalPlan;
this.originalPlan = originalPlan;
this.statistics = statistics;
this.structInfo = structInfo;
}
public Plan getLogicalPlan() {
@ -68,6 +75,10 @@ public class MTMVCache {
return statistics;
}
public StructInfo getStructInfo() {
return structInfo;
}
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
@ -96,6 +107,11 @@ public class MTMVCache {
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan, planner.getCascadesContext().getMemo().getRoot().getStatistics());
// Construct structInfo once for use later
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan,
planner.getCascadesContext(),
new BitSet());
return new MTMVCache(mvPlan, originPlan, planner.getCascadesContext().getMemo().getRoot().getStatistics(),
structInfoOptional.orElseGet(() -> null));
}
}

View File

@ -21,7 +21,7 @@ import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
@ -653,21 +653,23 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
* @see MatchMode
*/
private MatchMode decideMatchMode(List<CatalogRelation> queryRelations, List<CatalogRelation> viewRelations) {
List<TableIf> queryTableRefs = queryRelations.stream().map(CatalogRelation::getTable)
.collect(Collectors.toList());
List<TableIf> viewTableRefs = viewRelations.stream().map(CatalogRelation::getTable)
.collect(Collectors.toList());
boolean sizeSame = viewTableRefs.size() == queryTableRefs.size();
boolean queryPartial = viewTableRefs.containsAll(queryTableRefs);
if (!sizeSame && queryPartial) {
return MatchMode.QUERY_PARTIAL;
Set<TableIdentifier> queryTables = new HashSet<>();
for (CatalogRelation catalogRelation : queryRelations) {
queryTables.add(new TableIdentifier(catalogRelation.getTable()));
}
boolean viewPartial = queryTableRefs.containsAll(viewTableRefs);
if (!sizeSame && viewPartial) {
Set<TableIdentifier> viewTables = new HashSet<>();
for (CatalogRelation catalogRelation : viewRelations) {
viewTables.add(new TableIdentifier(catalogRelation.getTable()));
}
if (queryTables.equals(viewTables)) {
return MatchMode.COMPLETE;
}
if (queryTables.containsAll(viewTables)) {
return MatchMode.VIEW_PARTIAL;
}
if (sizeSame && queryPartial && viewPartial) {
return MatchMode.COMPLETE;
if (viewTables.containsAll(queryTables)) {
return MatchMode.QUERY_PARTIAL;
}
return MatchMode.NOT_MATCH;
}

View File

@ -55,8 +55,9 @@ public class AsyncMaterializationContext extends MaterializationContext {
* MaterializationContext, this contains necessary info for query rewriting by mv
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, cascadesContext), cascadesContext);
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, cascadesContext),
cascadesContext, structInfo);
this.mtmv = mtmv;
}

View File

@ -36,6 +36,7 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.BitSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -90,9 +91,16 @@ public class InitMaterializationContextHook implements PlannerHook {
if (mtmvCache == null) {
continue;
}
// For async materialization context, the cascades context when construct the struct info maybe
// different from the current cascadesContext
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(), ImmutableList.of(),
cascadesContext));
cascadesContext, mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
}
}
}

View File

@ -82,7 +82,8 @@ public abstract class MaterializationContext {
protected boolean success = false;
// Mark enable record failure detail info or not, because record failure detail info is performance-depleting
protected final boolean enableRecordFailureDetail;
// The materialization plan struct info
// The materialization plan struct info, construct struct info is expensive,
// this should be constructed once for all query for performance
protected final StructInfo structInfo;
// Group id set that are rewritten unsuccessfully by this materialization for reducing rewrite times
protected final Set<GroupId> matchedFailGroups = new HashSet<>();
@ -96,8 +97,8 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan,
Plan scanPlan, CascadesContext cascadesContext) {
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;
@ -115,24 +116,36 @@ public abstract class MaterializationContext {
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Construct materialization struct info, catch exception which may cause planner roll back
if (structInfo == null) {
Optional<StructInfo> structInfoOptional = constructStructInfo(plan, cascadesContext, new BitSet());
if (!structInfoOptional.isPresent()) {
this.available = false;
}
this.structInfo = structInfoOptional.orElseGet(() -> null);
} else {
this.structInfo = structInfo;
}
}
/**
* Construct materialized view Struct info
*/
public static Optional<StructInfo> constructStructInfo(Plan plan, CascadesContext cascadesContext,
BitSet expectedTableBitSet) {
List<StructInfo> viewStructInfos;
try {
viewStructInfos = MaterializedViewUtils.extractStructInfo(plan, cascadesContext, new BitSet());
viewStructInfos = MaterializedViewUtils.extractStructInfo(plan, cascadesContext, expectedTableBitSet);
if (viewStructInfos.size() > 1) {
// view struct info should only have one, log error and use the first struct info
LOG.warn(String.format("view strut info is more than one, materialization scan plan is %s, "
+ "materialization plan is %s",
scanPlan.treeString(), plan.treeString()));
LOG.warn(String.format("view strut info is more than one, materialization plan is %s",
plan.treeString()));
}
} catch (Exception exception) {
LOG.warn(String.format("construct materialization struct info fail, materialization scan plan is %s, "
+ "materialization plan is %s",
scanPlan.treeString(), plan.treeString()), exception);
this.available = false;
this.structInfo = null;
return;
LOG.warn(String.format("construct materialization struct info fail, materialization plan is %s",
plan.treeString()), exception);
return Optional.empty();
}
this.structInfo = viewStructInfos.get(0);
return Optional.of(viewStructInfos.get(0));
}
public boolean alreadyRewrite(GroupId groupId) {

View File

@ -98,6 +98,9 @@ public class StructInfo {
// bottom plan which top plan only contain join or scan. this is needed by hyper graph
private final Plan bottomPlan;
private final List<CatalogRelation> relations;
// This is generated by cascadesContext, this may be different in different cascadesContext
// So if the cascadesContext currently is different form the cascadesContext which generated it.
// Should regenerate the tableBitSet by current cascadesContext and call withTableBitSet method
private final BitSet tableBitSet;
// this is for LogicalCompatibilityContext later
private final Map<RelationId, StructInfoNode> relationIdStructInfoNodeMap;
@ -156,6 +159,15 @@ public class StructInfo {
this.shuttledExpressionsToExpressionsMap, this.namedExprIdAndExprMapping, this.tableBitSet);
}
/**
* Construct StructInfo with new tableBitSet
*/
public StructInfo withTableBitSet(BitSet tableBitSet) {
return new StructInfo(this.originalPlan, this.originalPlanId, this.hyperGraph, this.valid, this.topPlan,
this.bottomPlan, this.relations, this.relationIdStructInfoNodeMap, this.predicates,
this.shuttledExpressionsToExpressionsMap, this.namedExprIdAndExprMapping, tableBitSet);
}
private static boolean collectStructInfoFromGraph(HyperGraph hyperGraph,
Plan topPlan,
Map<ExpressionPosition, Map<Expression, Expression>> shuttledExpressionsToExpressionsMap,
@ -174,12 +186,30 @@ public class StructInfo {
relations.addAll(nodeRelations);
nodeRelations.forEach(relation -> hyperTableBitSet.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
// plan relation collector and set to map
StructInfoNode structInfoNode = (StructInfoNode) node;
// record expressions in node
if (structInfoNode.getExpressions() != null) {
structInfoNode.getExpressions().forEach(expression -> {
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
Lists.newArrayList(expression), ImmutableSet.of(),
ImmutableSet.of(), new BitSet());
structInfoNode.getPlan().accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
List<Expression> replacedExpressions = replaceContext.getReplacedExpressions();
putShuttledExpressionsToExpressionsMap(shuttledExpressionsToExpressionsMap,
ExpressionPosition.NODE, replacedExpressions.get(0), expression);
// Record this, will be used in top level expression shuttle later, see the method
// ExpressionLineageReplacer#visitGroupPlan
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
});
}
// every node should only have one relation, this is for LogicalCompatibilityContext
if (!nodeRelations.isEmpty()) {
relationIdStructInfoNodeMap.put(nodeRelations.get(0).getRelationId(), (StructInfoNode) node);
relationIdStructInfoNodeMap.put(nodeRelations.get(0).getRelationId(), structInfoNode);
}
});
// Collect expression from join condition in hyper graph
for (JoinEdge edge : hyperGraph.getJoinEdges()) {
List<Expression> hashJoinConjuncts = edge.getHashJoinConjuncts();
@ -190,7 +220,7 @@ public class StructInfo {
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
Lists.newArrayList(conjunctExpr), ImmutableSet.of(),
ImmutableSet.of(), hyperTableBitSet);
ImmutableSet.of(), new BitSet());
topPlan.accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
List<Expression> replacedExpressions = replaceContext.getReplacedExpressions();
@ -205,28 +235,6 @@ public class StructInfo {
return false;
}
}
// Record expressions in node
hyperGraph.getNodes().forEach(node -> {
// plan relation collector and set to map
StructInfoNode structInfoNode = (StructInfoNode) node;
// record expressions in node
if (structInfoNode.getExpressions() != null) {
structInfoNode.getExpressions().forEach(expression -> {
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
Lists.newArrayList(expression), ImmutableSet.of(),
ImmutableSet.of(), hyperTableBitSet);
structInfoNode.getPlan().accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
List<Expression> replacedExpressions = replaceContext.getReplacedExpressions();
putShuttledExpressionsToExpressionsMap(shuttledExpressionsToExpressionsMap,
ExpressionPosition.NODE, replacedExpressions.get(0), expression);
// Record this, will be used in top level expression shuttle later, see the method
// ExpressionLineageReplacer#visitGroupPlan
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
});
}
});
// Collect expression from where in hyper graph
hyperGraph.getFilterEdges().forEach(filterEdge -> {
List<? extends Expression> filterExpressions = filterEdge.getExpressions();
@ -235,7 +243,7 @@ public class StructInfo {
ExpressionUtils.extractConjunction(predicate).forEach(expr ->
putShuttledExpressionsToExpressionsMap(shuttledExpressionsToExpressionsMap,
ExpressionPosition.FILTER_EDGE,
ExpressionUtils.shuttleExpressionWithLineage(predicate, topPlan, hyperTableBitSet),
ExpressionUtils.shuttleExpressionWithLineage(predicate, topPlan, new BitSet()),
predicate));
});
});