cherry pick from master pr: #35897 commitId: 603fa82f
This commit is contained in:
@ -1550,6 +1550,11 @@ public class Env {
|
||||
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
|
||||
}
|
||||
}
|
||||
if (journalVersion <= FeMetaVersion.VERSION_129) {
|
||||
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
|
||||
SessionVariable.ENABLE_MATERIALIZED_VIEW_REWRITE,
|
||||
"true");
|
||||
}
|
||||
}
|
||||
|
||||
getPolicyMgr().createDefaultStoragePolicy();
|
||||
|
||||
@ -279,14 +279,6 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
|
||||
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
|
||||
invalidPartitions;
|
||||
if (partitionNeedUnion && !sessionVariable.isEnableMaterializedViewUnionRewrite()) {
|
||||
// if use invalid partition but not enable union rewrite
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"Partition query used is invalid",
|
||||
() -> String.format("the partition used by query is invalid by materialized view,"
|
||||
+ "invalid partition info query used is %s", finalInvalidPartitions));
|
||||
continue;
|
||||
}
|
||||
if (partitionNeedUnion) {
|
||||
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
|
||||
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.nereids.rules.exploration.mv;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVCache;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
@ -62,12 +61,18 @@ public class InitMaterializationContextHook implements PlannerHook {
|
||||
if (!cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
|
||||
return;
|
||||
}
|
||||
Plan rewritePlan = cascadesContext.getRewritePlan();
|
||||
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), true);
|
||||
// Keep use one connection context when in query, if new connect context,
|
||||
// the ConnectionContext.get() will change
|
||||
collectorContext.setConnectContext(cascadesContext.getConnectContext());
|
||||
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
|
||||
try {
|
||||
Plan rewritePlan = cascadesContext.getRewritePlan();
|
||||
// Keep use one connection context when in query, if new connect context,
|
||||
// the ConnectionContext.get() will change
|
||||
collectorContext.setConnectContext(cascadesContext.getConnectContext());
|
||||
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
|
||||
cascadesContext.getConnectContext().getQueryIdentifier()), e);
|
||||
return;
|
||||
}
|
||||
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
|
||||
if (collectedTables.isEmpty()) {
|
||||
return;
|
||||
@ -77,30 +82,32 @@ public class InitMaterializationContextHook implements PlannerHook {
|
||||
Set<MTMV> availableMTMVs = Env.getCurrentEnv().getMtmvService().getRelationManager()
|
||||
.getAvailableMTMVs(usedBaseTables, cascadesContext.getConnectContext());
|
||||
if (availableMTMVs.isEmpty()) {
|
||||
LOG.warn(String.format("enable materialized view rewrite but availableMTMVs is empty, current queryId "
|
||||
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
|
||||
LOG.debug(String.format("Enable materialized view rewrite but availableMTMVs is empty, current queryId "
|
||||
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
|
||||
return;
|
||||
}
|
||||
for (MTMV materializedView : availableMTMVs) {
|
||||
MTMVCache mtmvCache = null;
|
||||
try {
|
||||
mtmvCache = materializedView.getOrGenerateCache(cascadesContext.getConnectContext());
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("MaterializationContext init mv cache generate fail", e);
|
||||
if (mtmvCache == null) {
|
||||
continue;
|
||||
}
|
||||
// For async materialization context, the cascades context when construct the struct info maybe
|
||||
// different from the current cascadesContext
|
||||
// so regenerate the struct info table bitset
|
||||
StructInfo mvStructInfo = mtmvCache.getStructInfo();
|
||||
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
|
||||
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
|
||||
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
|
||||
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
|
||||
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(),
|
||||
ImmutableList.of(), cascadesContext,
|
||||
mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("MaterializationContext init mv cache generate fail, current queryId is %s",
|
||||
cascadesContext.getConnectContext().getQueryIdentifier()), e);
|
||||
}
|
||||
if (mtmvCache == null) {
|
||||
continue;
|
||||
}
|
||||
// For async materialization context, the cascades context when construct the struct info maybe
|
||||
// different from the current cascadesContext
|
||||
// so regenerate the struct info table bitset
|
||||
StructInfo mvStructInfo = mtmvCache.getStructInfo();
|
||||
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
|
||||
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
|
||||
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
|
||||
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
|
||||
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(), ImmutableList.of(),
|
||||
cascadesContext, mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1709,7 +1709,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE, needForward = true,
|
||||
description = {"是否开启基于结构信息的物化视图透明改写",
|
||||
"Whether to enable materialized view rewriting based on struct info"})
|
||||
public boolean enableMaterializedViewRewrite = false;
|
||||
public boolean enableMaterializedViewRewrite = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ALLOW_MODIFY_MATERIALIZED_VIEW_DATA, needForward = true,
|
||||
description = {"是否允许修改物化视图的数据",
|
||||
@ -1736,7 +1736,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询",
|
||||
"When the materialized view is not enough to provide all the data for the query, "
|
||||
+ "whether to allow the union of the base table and the materialized view to "
|
||||
+ "respond to the query"})
|
||||
+ "respond to the query"}, varType = VariableAnnotation.DEPRECATED)
|
||||
public boolean enableMaterializedViewUnionRewrite = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_NEST_REWRITE, needForward = true,
|
||||
|
||||
Reference in New Issue
Block a user