[test](mtmv)Add tpch test cherry pick to branch 21 (#32611)
* [test](neredis) Add tpch test for query rewrite by materialized view (#30870) query rewrite by materialized view sql is as following q1, q5, q6, q8, q9, q12, q14 the other is not supported now, will be supported later * change code usage
This commit is contained in:
@ -17,19 +17,28 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.analyzer.UnboundResultSink;
|
||||
import org.apache.doris.nereids.analyzer.UnboundTableSink;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The cache for materialized view cache
|
||||
*/
|
||||
@ -61,14 +70,25 @@ public class MTMVCache {
|
||||
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
|
||||
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
|
||||
}
|
||||
Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
|
||||
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
|
||||
unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor<LogicalPlan, Void>() {
|
||||
// convert to table sink to eliminate sort under table sink, because sort under result sink can not be
|
||||
// eliminated
|
||||
@Override
|
||||
public Plan visit(Plan plan, Object context) {
|
||||
if (plan instanceof LogicalResultSink) {
|
||||
return ((LogicalResultSink<Plan>) plan).child();
|
||||
}
|
||||
return super.visit(plan, context);
|
||||
public LogicalPlan visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink,
|
||||
Void context) {
|
||||
return new UnboundTableSink<>(mtmv.getFullQualifiers(),
|
||||
mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()),
|
||||
Lists.newArrayList(),
|
||||
mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()),
|
||||
unboundResultSink.child());
|
||||
}
|
||||
}, null);
|
||||
Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
|
||||
// eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view
|
||||
Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter<Object>() {
|
||||
@Override
|
||||
public Plan visitLogicalTableSink(LogicalTableSink<? extends Plan> logicalTableSink, Object context) {
|
||||
return logicalTableSink.child().accept(this, context);
|
||||
}
|
||||
}, null);
|
||||
return new MTMVCache(mvPlan, originPlan);
|
||||
|
||||
@ -372,6 +372,7 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
|
||||
@Override
|
||||
protected boolean checkPattern(StructInfo structInfo) {
|
||||
PlanCheckContext checkContext = PlanCheckContext.of(SUPPORTED_JOIN_TYPE_SET);
|
||||
// if query or mv contains more then one top aggregate, should fail
|
||||
return structInfo.getTopPlan().accept(StructInfo.PLAN_PATTERN_CHECKER, checkContext)
|
||||
&& checkContext.isContainsTopAggregate() && checkContext.getTopAggregateNum() <= 1;
|
||||
}
|
||||
|
||||
@ -42,7 +42,7 @@ import java.util.Set;
|
||||
/**
|
||||
* logical hive table sink for insert command
|
||||
*/
|
||||
public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE>
|
||||
public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableSink<CHILD_TYPE>
|
||||
implements Sink, PropagateFuncDeps {
|
||||
// bound data sink
|
||||
private final HMSExternalDatabase database;
|
||||
|
||||
@ -78,11 +78,11 @@ public interface SinkVisitor<R, C> {
|
||||
}
|
||||
|
||||
default R visitLogicalOlapTableSink(LogicalOlapTableSink<? extends Plan> olapTableSink, C context) {
|
||||
return visitLogicalSink(olapTableSink, context);
|
||||
return visitLogicalTableSink(olapTableSink, context);
|
||||
}
|
||||
|
||||
default R visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> hiveTableSink, C context) {
|
||||
return visitLogicalSink(hiveTableSink, context);
|
||||
return visitLogicalTableSink(hiveTableSink, context);
|
||||
}
|
||||
|
||||
default R visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, C context) {
|
||||
@ -107,11 +107,11 @@ public interface SinkVisitor<R, C> {
|
||||
}
|
||||
|
||||
default R visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink, C context) {
|
||||
return visitPhysicalSink(olapTableSink, context);
|
||||
return visitPhysicalTableSink(olapTableSink, context);
|
||||
}
|
||||
|
||||
default R visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink, C context) {
|
||||
return visitPhysicalSink(hiveTableSink, context);
|
||||
return visitPhysicalTableSink(hiveTableSink, context);
|
||||
}
|
||||
|
||||
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) {
|
||||
|
||||
Reference in New Issue
Block a user