Pick some pr to branch 21 #42279 #44164 (#44369)

This commit is contained in:
seawinde
2024-11-23 03:28:38 +08:00
committed by GitHub
parent 92a0919e9c
commit 5e9bda678c
9 changed files with 200 additions and 68 deletions

View File

@ -244,25 +244,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
} else {
// it's a duplicate, unique or hash distribution agg table
// add delete sign filter on olap scan if needed
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
&& !ConnectContext.get().getSessionVariable().skipDeleteSign()) {
// table qualifier is catalog.db.table, we make db.table.column
Slot deleteSlot = null;
for (Slot slot : scan.getOutput()) {
if (slot.getName().equals(Column.DELETE_SIGN)) {
deleteSlot = slot;
break;
}
}
Preconditions.checkArgument(deleteSlot != null);
Expression conjunct = new EqualTo(new TinyIntLiteral((byte) 0), deleteSlot);
if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
scan = scan.withPreAggStatus(
PreAggStatus.off(Column.DELETE_SIGN + " is used as conjuncts."));
}
return new LogicalFilter<>(Sets.newHashSet(conjunct), scan);
}
return scan;
return checkAndAddDeleteSignFilter(scan, ConnectContext.get(), (OlapTable) table);
}
}
@ -370,6 +352,32 @@ public class BindRelation extends OneAnalysisRuleFactory {
return Optional.empty();
}
/**
* Add delete sign filter on olap scan if need.
*/
public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan scan, ConnectContext connectContext,
OlapTable olapTable) {
if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign()
&& !connectContext.getSessionVariable().skipDeleteSign()) {
// table qualifier is catalog.db.table, we make db.table.column
Slot deleteSlot = null;
for (Slot slot : scan.getOutput()) {
if (slot.getName().equals(Column.DELETE_SIGN)) {
deleteSlot = slot;
break;
}
}
Preconditions.checkArgument(deleteSlot != null);
Expression conjunct = new EqualTo(new TinyIntLiteral((byte) 0), deleteSlot);
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
scan = scan.withPreAggStatus(PreAggStatus.off(
Column.DELETE_SIGN + " is used as conjuncts."));
}
return new LogicalFilter<>(Sets.newHashSet(conjunct), scan);
}
return scan;
}
private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelation,
List<String> qualifiedTableName, CascadesContext cascadesContext) {
// for create view stmt replace tableName to ctl.db.tableName

View File

@ -234,7 +234,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan();
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
@ -254,15 +254,14 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
}
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
boolean checkResult = rewriteQueryByViewPreCheck(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext);
if (!checkResult) {
continue;
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
@ -374,9 +373,9 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
@ -527,6 +526,16 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
return Pair.of(mvPartitionNeedRemoveNameMap, baseTablePartitionNeedUnionNameMap);
}
/**
* Query rewrite result may output origin plan , this will cause loop.
* if return origin plan, need add check hear.
*/
protected boolean rewriteQueryByViewPreCheck(MatchMode matchMode, StructInfo queryStructInfo,
StructInfo viewStructInfo, SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan,
MaterializationContext materializationContext) {
return true;
}
/**
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@ -56,8 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, cascadesContext),
cascadesContext, structInfo);
super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}
@ -67,7 +67,8 @@ public class AsyncMaterializationContext extends MaterializationContext {
@Override
Plan doGenerateScanPlan(CascadesContext cascadesContext) {
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, cascadesContext);
return MaterializedViewUtils.generateMvScanPlan(this.mtmv, this.mtmv.getBaseIndexId(),
this.mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext);
}
@Override
@ -107,7 +108,8 @@ public class AsyncMaterializationContext extends MaterializationContext {
return Optional.empty();
}
RelationId relationId = null;
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan().collectFirst(LogicalOlapScan.class::isInstance);
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
}
@ -127,7 +129,14 @@ public class AsyncMaterializationContext extends MaterializationContext {
);
}
public Plan getScanPlan() {
@Override
public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) {
// If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent,
// try to get lock which should added before RBO
if (!this.isSuccess()) {
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
}
super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}

View File

@ -105,22 +105,13 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;
StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
// Construct materialization struct info, catch exception which may cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null)
@ -128,10 +119,6 @@ public abstract class MaterializationContext {
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions();
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
scanPlanOutput);
}
}
@ -176,17 +163,19 @@ public abstract class MaterializationContext {
* if MaterializationContext is already rewritten successfully, then should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
* This method should be called when query rewrite successfully
*/
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
public void tryGenerateScanPlan(CascadesContext cascadesContext) {
if (!this.isAvailable()) {
return;
}
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Materialization output expression shuttle, this will be used to expression rewrite
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions,
scanPlanOutput);
// This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
@ -195,6 +184,17 @@ public abstract class MaterializationContext {
this.exprToScanExprMapping = regeneratedMapping;
}
/**
* Should clear scan plan after materializationContext is already rewritten successfully,
* Because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
*/
public void clearScanPlan(CascadesContext cascadesContext) {
this.scanPlan = null;
this.shuttledExprToScanExprMapping = null;
this.exprToScanExprMapping = null;
}
public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping);
}
@ -275,7 +275,11 @@ public abstract class MaterializationContext {
return originalPlan;
}
public Plan getScanPlan() {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
|| this.exprToScanExprMapping == null) {
tryGenerateScanPlan(cascadesContext);
}
return scanPlan;
}

View File

@ -20,7 +20,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.TableIdentifier;
@ -30,6 +30,7 @@ import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.StructInfoMap;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Alias;
@ -212,19 +213,24 @@ public class MaterializedViewUtils {
* when query rewrite, because one plan may hit the materialized view repeatedly and the mv scan output
* should be different
*/
public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cascadesContext) {
return new LogicalOlapScan(
public static Plan generateMvScanPlan(OlapTable table, long indexId,
List<Long> partitionIds,
PreAggStatus preAggStatus,
CascadesContext cascadesContext) {
LogicalOlapScan olapScan = new LogicalOlapScan(
cascadesContext.getStatementContext().getNextRelationId(),
materializedView,
materializedView.getFullQualifiers(),
table,
ImmutableList.of(table.getQualifiedDbName()),
ImmutableList.of(),
materializedView.getPartitionIds(),
materializedView.getBaseIndexId(),
PreAggStatus.on(),
partitionIds,
indexId,
preAggStatus,
ImmutableList.of(),
// this must be empty, or it will be used to sample
ImmutableList.of(),
Optional.empty());
return BindRelation.checkAndAddDeleteSignFilter(olapScan, cascadesContext.getConnectContext(),
olapScan.getTable());
}
/**

View File

@ -75,7 +75,8 @@ public class IdStatisticsMapTest extends SqlTestBase {
.analyze()
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully, so need tmp store
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0).getScanPlan().getOutputSet();
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0)
.getScanPlan(null, c1).getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();

View File

@ -373,3 +373,49 @@
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2
-- !query12_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2
-- !query12_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

View File

@ -1549,7 +1549,7 @@ class Suite implements GroovyInterceptable {
// is_partition_statistics_ready is the bool value which identifying if partition row count is valid or not
// if true, check if chosen by cbo or doesn't check
void mv_rewrite_success(query_sql, mv_name, sync_cbo_rewrite = enable_sync_mv_cost_based_rewrite(),
is_partition_statistics_ready = true) {
is_partition_statistics_ready = true) {
logger.info("query_sql = " + query_sql + ", mv_name = " + mv_name + ", sync_cbo_rewrite = " +sync_cbo_rewrite
+ ", is_partition_statistics_ready = " + is_partition_statistics_ready)
if (!is_partition_statistics_ready) {
@ -1575,7 +1575,7 @@ class Suite implements GroovyInterceptable {
// is_partition_statistics_ready is the bool value which identifying if partition row count is valid or not
// if true, check if chosen by cbo or doesn't check
void mv_rewrite_all_success( query_sql, mv_names, sync_cbo_rewrite = enable_sync_mv_cost_based_rewrite(),
is_partition_statistics_ready = true) {
is_partition_statistics_ready = true) {
logger.info("query_sql = " + query_sql + ", mv_names = " + mv_names + ", sync_cbo_rewrite = " +sync_cbo_rewrite
+ ", is_partition_statistics_ready = " + is_partition_statistics_ready)
if (!is_partition_statistics_ready) {
@ -1613,7 +1613,7 @@ class Suite implements GroovyInterceptable {
// is_partition_statistics_ready is the bool value which identifying if partition row count is valid or not
// if true, check if chosen by cbo or doesn't check
void mv_rewrite_any_success(query_sql, mv_names, sync_cbo_rewrite = enable_sync_mv_cost_based_rewrite(),
is_partition_statistics_ready = true) {
is_partition_statistics_ready = true) {
logger.info("query_sql = " + query_sql + ", mv_names = " + mv_names + ", sync_cbo_rewrite = " +sync_cbo_rewrite
+ ", is_partition_statistics_ready = " + is_partition_statistics_ready)
if (!is_partition_statistics_ready) {
@ -1649,7 +1649,7 @@ class Suite implements GroovyInterceptable {
// multi mv part in rewrite process, all rewrte success without check if chosen by cbo
// sync_cbo_rewrite is the bool value which control sync mv is use cbo based mv rewrite
void mv_rewrite_all_success_without_check_chosen(query_sql, mv_names,
sync_cbo_rewrite = enable_sync_mv_cost_based_rewrite()){
sync_cbo_rewrite = enable_sync_mv_cost_based_rewrite()){
logger.info("query_sql = " + query_sql + ", mv_names = " + mv_names)
if (!sync_cbo_rewrite) {
explain {

View File

@ -759,4 +759,53 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""
def mv12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""
def query12_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code <> '91'
union all
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code = '92';
"""
order_qt_query12_0_before "${query12_0}"
async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0")
order_qt_query12_0_after "${query12_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0"""
}