From 17ffd301fbc56d350fa44021d31322ac93301edc Mon Sep 17 00:00:00 2001 From: seawinde Date: Thu, 20 Mar 2025 10:01:04 +0800 Subject: [PATCH] [fix](mtmv) Fix collecting mv candidates when dml controlled by enable_dml_materialized_view_rewrite switch #48374 (#49263) ### What problem does this PR solve? pr: https://github.com/apache/doris/pull/48374 commitId: 0c9ce720 --- .../org/apache/doris/nereids/CascadesContext.java | 6 +++--- .../apache/doris/nereids/StatementContext.java | 5 +++-- .../doris/nereids/jobs/executor/Analyzer.java | 2 -- ...r.java => TableCollectAndHookInitializer.java} | 8 +++++--- .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../analysis/AddInitMaterializationHook.java | 15 +++++++++++---- .../nereids/rules/analysis/CollectRelation.java | 12 +++++++++++- .../java/org/apache/doris/qe/StmtExecutor.java | 4 ---- 8 files changed, 34 insertions(+), 19 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/{TableCollector.java => TableCollectAndHookInitializer.java} (85%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 2587047639..1486f03e26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -23,7 +23,7 @@ import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.jobs.Job; import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.jobs.executor.Analyzer; -import org.apache.doris.nereids.jobs.executor.TableCollector; +import org.apache.doris.nereids.jobs.executor.TableCollectAndHookInitializer; import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob; import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob; import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext; @@ -224,8 +224,8 @@ public class CascadesContext implements ScheduleContext { this.memo = new Memo(getConnectContext(), plan); } - public TableCollector newTableCollector() { - return new TableCollector(this); + public TableCollectAndHookInitializer newTableCollector() { + return new TableCollectAndHookInitializer(this); } public Analyzer newAnalyzer() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 09e20926dd..de7f041a46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -73,6 +73,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -204,7 +205,7 @@ public class StatementContext implements Closeable { private FormatOptions formatOptions = FormatOptions.getDefault(); - private final List plannerHooks = new ArrayList<>(); + private final Set plannerHooks = new HashSet<>(); private final Map snapshots = Maps.newHashMap(); @@ -624,7 +625,7 @@ public class StatementContext implements Closeable { return formatOptions; } - public List getPlannerHooks() { + public Set getPlannerHooks() { return plannerHooks; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java index 3a111a7f4d..5b8e52c028 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java @@ -19,7 +19,6 @@ package org.apache.doris.nereids.jobs.executor; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.rewrite.RewriteJob; -import org.apache.doris.nereids.rules.analysis.AddInitMaterializationHook; import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet; import org.apache.doris.nereids.rules.analysis.AnalyzeCTE; import org.apache.doris.nereids.rules.analysis.BindExpression; @@ -102,7 +101,6 @@ public class Analyzer extends AbstractBatchJobExecutor { bottomUp(new BindExpression()), topDown(new BindSink()), bottomUp(new CheckAfterBind()), - bottomUp(new AddInitMaterializationHook()), bottomUp( new ProjectToGlobalAggregate(), // this rule check's the logicalProject node's isDistinct property diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollectAndHookInitializer.java similarity index 85% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollector.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollectAndHookInitializer.java index 0ae433262e..01ce6687ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TableCollectAndHookInitializer.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.jobs.executor; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.rewrite.RewriteJob; +import org.apache.doris.nereids.rules.analysis.AddInitMaterializationHook; import org.apache.doris.nereids.rules.analysis.CollectRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalView; @@ -30,7 +31,7 @@ import java.util.List; * Bind symbols according to metadata in the catalog, perform semantic analysis, etc. * TODO: revisit the interface after subquery analysis is supported. */ -public class TableCollector extends AbstractBatchJobExecutor { +public class TableCollectAndHookInitializer extends AbstractBatchJobExecutor { public static final List COLLECT_JOBS = buildCollectTableJobs(); @@ -39,7 +40,7 @@ public class TableCollector extends AbstractBatchJobExecutor { * * @param cascadesContext current context for analyzer */ - public TableCollector(CascadesContext cascadesContext) { + public TableCollectAndHookInitializer(CascadesContext cascadesContext) { super(cascadesContext); } @@ -59,12 +60,13 @@ public class TableCollector extends AbstractBatchJobExecutor { private static List buildCollectTableJobs() { return notTraverseChildrenOf( ImmutableSet.of(LogicalView.class), - TableCollector::buildCollectorJobs + TableCollectAndHookInitializer::buildCollectorJobs ); } private static List buildCollectorJobs() { return jobs( + topDown(new AddInitMaterializationHook()), topDown(new CollectRelation()) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 776681fd88..e401330a65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -41,6 +41,7 @@ public enum RuleType { BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE), + INIT_MATERIALIZATION_HOOK_FOR_RESULT_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_FILE(RuleTypeClass.REWRITE), BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE), BINDING_RELATION(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AddInitMaterializationHook.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AddInitMaterializationHook.java index bf19c4311d..8e52942c99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AddInitMaterializationHook.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AddInitMaterializationHook.java @@ -20,7 +20,7 @@ package org.apache.doris.nereids.rules.analysis; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.exploration.mv.InitConsistentMaterializationContextHook; -import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; import com.google.common.collect.ImmutableList; @@ -28,7 +28,7 @@ import java.util.List; /** * Add init materialization hook for table sink and file sink - * */ + */ public class AddInitMaterializationHook implements AnalysisRuleFactory { @Override @@ -41,13 +41,20 @@ public class AddInitMaterializationHook implements AnalysisRuleFactory { } return ctx.root; })), - RuleType.INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK.build( - any().when(LogicalTableSink.class::isInstance) + RuleType.INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK.build(unboundTableSink() .thenApply(ctx -> { if (ctx.connectContext.getSessionVariable().isEnableDmlMaterializedViewRewrite()) { ctx.statementContext.addPlannerHook(InitConsistentMaterializationContextHook.INSTANCE); } return ctx.root; + })), + RuleType.INIT_MATERIALIZATION_HOOK_FOR_RESULT_SINK.build(unboundResultSink() + .thenApply(ctx -> { + if (ctx.connectContext.getSessionVariable().isEnableMaterializedViewRewrite() + && ctx.connectContext.getState().isQuery()) { + ctx.statementContext.addPlannerHook(InitMaterializationContextHook.INSTANCE); + } + return ctx.root; })) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 755f6aa947..410028ff73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.View; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.nereids.CTEContext; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.PlannerHook; import org.apache.doris.nereids.StatementContext.TableFrom; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundResultSink; @@ -34,6 +35,7 @@ import org.apache.doris.nereids.pattern.MatchingContext; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; import org.apache.doris.nereids.trees.expressions.CTEId; import org.apache.doris.nereids.trees.expressions.SubqueryExpr; import org.apache.doris.nereids.trees.plans.Plan; @@ -195,7 +197,15 @@ public class CollectRelation implements AnalysisRuleFactory { } private void collectMTMVCandidates(TableIf table, CascadesContext cascadesContext) { - if (cascadesContext.getConnectContext().getSessionVariable().enableMaterializedViewRewrite) { + boolean shouldCollect = false; + for (PlannerHook plannerHook : cascadesContext.getStatementContext().getPlannerHooks()) { + // only collect when InitMaterializationContextHook exists in planner hooks + if (plannerHook instanceof InitMaterializationContextHook) { + shouldCollect = true; + break; + } + } + if (shouldCollect) { Set mtmvSet = Env.getCurrentEnv().getMtmvService().getRelationManager() .getAllMTMVs(Lists.newArrayList(new BaseTableInfo(table))); LOG.info("table {} related mv set is {}", new BaseTableInfo(table), mtmvSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5f3bf4c1b1..248290226e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -142,7 +142,6 @@ import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand; @@ -789,9 +788,6 @@ public class StmtExecutor { // t3: observer fe receive editlog creating the table from the master fe syncJournalIfNeeded(); planner = new NereidsPlanner(statementContext); - if (context.getSessionVariable().isEnableMaterializedViewRewrite()) { - statementContext.addPlannerHook(InitMaterializationContextHook.INSTANCE); - } try { planner.plan(parsedStmt, context.getSessionVariable().toThrift()); checkBlockRules();