[fix](mtmv) Fix getting related partition table wrongly when multi base partition table exists (#34781)
Fix getting related partition table wrongly when multi base partition
table exists
such as base table def is as following:
CREATE TABLE `test1` (
`pre_batch_no` VARCHAR(100) NULL COMMENT 'pre_batch_no',
`batch_no` VARCHAR(100) NULL COMMENT 'batch_no',
`vin_type1` VARCHAR(50) NULL COMMENT 'vin',
`upgrade_day` date COMMENT 'upgrade_day'
) ENGINE=OLAP
unique KEY(`pre_batch_no`,`batch_no`, `vin_type1`, `upgrade_day`)
COMMENT 'OLAP'
PARTITION BY RANGE(`upgrade_day`)
(
FROM ("2024-03-20") TO ("2024-03-31") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH(`vin_type1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
CREATE TABLE `test2` (
`batch_no` VARCHAR(100) NULL COMMENT 'batch_no',
`vin_type2` VARCHAR(50) NULL COMMENT 'vin',
`status` VARCHAR(50) COMMENT 'status',
`upgrade_day` date not null COMMENT 'upgrade_day'
) ENGINE=OLAP
Duplicate KEY(`batch_no`,`vin_type2`)
COMMENT 'OLAP'
PARTITION BY RANGE(`upgrade_day`)
(
FROM ("2024-01-01") TO ("2024-01-10") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH(`vin_type2`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
if you create partition mv which partition by ` t1.upgrade_day` as
following it will be successful
select
t1.upgrade_day,
t1.batch_no,
t1.vin_type1
from
(
SELECT
batch_no,
vin_type1,
upgrade_day
FROM
test1
where
batch_no like 'c%'
group by
batch_no,
vin_type1,
upgrade_day
) t1
left join (
select
batch_no,
vin_type2,
status
from
test2
group by
batch_no,
vin_type2,
status
) t2 on t1.vin_type1 = t2.vin_type2;
This commit is contained in:
@ -21,7 +21,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.catalog.constraint.TableIdentifier;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PreAggStatus;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
@ -50,11 +51,12 @@ import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -95,17 +97,29 @@ public class MaterializedViewUtils {
|
||||
if (!columnSlot.isColumnFromTable()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
// check sql pattern
|
||||
IncrementCheckerContext context = new IncrementCheckerContext(columnSlot);
|
||||
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, context);
|
||||
if (context.getPartitionRelatedTableAndColumnList().isEmpty() || !context.isPctPossible()) {
|
||||
// Collect table relation map which is used to identify self join
|
||||
List<CatalogRelation> catalogRelationObjs =
|
||||
materializedViewPlan.collectToList(CatalogRelation.class::isInstance);
|
||||
ImmutableMultimap.Builder<TableIdentifier, CatalogRelation> tableCatalogRelationMultimapBuilder =
|
||||
ImmutableMultimap.builder();
|
||||
for (CatalogRelation catalogRelation : catalogRelationObjs) {
|
||||
tableCatalogRelationMultimapBuilder.put(new TableIdentifier(catalogRelation.getTable()), catalogRelation);
|
||||
}
|
||||
// Check sql pattern
|
||||
IncrementCheckerContext checkContext =
|
||||
new IncrementCheckerContext(columnSlot, tableCatalogRelationMultimapBuilder.build());
|
||||
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, checkContext);
|
||||
Multimap<TableIf, Column> partitionRelatedTableAndColumnMap =
|
||||
checkContext.getPartitionRelatedTableAndColumnMap();
|
||||
if (partitionRelatedTableAndColumnMap.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
// TODO support to return only one related table info, support multi later
|
||||
Pair<TableIf, Column> tableIfColumnPair = context.getPartitionRelatedTableAndColumnList().get(0);
|
||||
return Optional.of(new RelatedTableInfo(new BaseTableInfo(tableIfColumnPair.key()),
|
||||
context.isPctPossible(),
|
||||
tableIfColumnPair.value().getName()));
|
||||
for (Map.Entry<TableIf, Column> entry : partitionRelatedTableAndColumnMap.entries()) {
|
||||
return Optional.of(new RelatedTableInfo(new BaseTableInfo(entry.getKey()), true,
|
||||
entry.getValue().getName()));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -289,7 +303,6 @@ public class MaterializedViewUtils {
|
||||
public Void visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join,
|
||||
IncrementCheckerContext context) {
|
||||
if (join.isMarkJoin()) {
|
||||
context.setPctPossible(false);
|
||||
return null;
|
||||
}
|
||||
Plan left = join.child(0);
|
||||
@ -301,20 +314,17 @@ public class MaterializedViewUtils {
|
||||
boolean useLeft = leftColumnSet.contains(context.getMvPartitionColumn().getColumn().get());
|
||||
JoinType joinType = join.getJoinType();
|
||||
if (joinType.isInnerJoin() || joinType.isCrossJoin()) {
|
||||
context.setPctPossible(true);
|
||||
} else if (joinType.isLeftJoin()
|
||||
return visit(join, context);
|
||||
} else if ((joinType.isLeftJoin()
|
||||
|| joinType.isLefSemiJoin()
|
||||
|| joinType.isLeftAntiJoin()) {
|
||||
context.setPctPossible(useLeft);
|
||||
} else if (joinType.isRightJoin()
|
||||
|| joinType.isLeftAntiJoin()) && useLeft) {
|
||||
return visit(join.left(), context);
|
||||
} else if ((joinType.isRightJoin()
|
||||
|| joinType.isRightAntiJoin()
|
||||
|| joinType.isRightSemiJoin()) {
|
||||
context.setPctPossible(!useLeft);
|
||||
} else {
|
||||
// un supported join type
|
||||
context.setPctPossible(false);
|
||||
|| joinType.isRightSemiJoin()) && !useLeft) {
|
||||
return visit(join.right(), context);
|
||||
}
|
||||
return visit(join, context);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -324,15 +334,13 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
LogicalCatalogRelation logicalCatalogRelation = (LogicalCatalogRelation) relation;
|
||||
TableIf table = logicalCatalogRelation.getTable();
|
||||
// if self join, can't infer partition column
|
||||
if (!context.getTableIdAndRelationMapping().get(table.getId()).isEmpty()) {
|
||||
context.setPctPossible(false);
|
||||
// if self join, self join can not partition track now, remove the partition column correspondingly
|
||||
if (context.getRelationByTable(table).size() > 1) {
|
||||
context.getPartitionRelatedTableAndColumnMap().removeAll(table);
|
||||
return null;
|
||||
}
|
||||
// record tableId and relation, to check the self join
|
||||
context.addTableIdAndRelation(((LogicalCatalogRelation) relation).getTable().getId(), relation);
|
||||
// TODO: 2024/1/31 support only one partition referenced column, support multi later
|
||||
if (!context.getPartitionRelatedTableAndColumnList().isEmpty()) {
|
||||
if (!context.getPartitionRelatedTableAndColumnMap().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
@ -345,9 +353,9 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
|
||||
Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get();
|
||||
if (partitionColumnSet.contains(mvReferenceColumn)) {
|
||||
if (partitionColumnSet.contains(mvReferenceColumn)
|
||||
&& (!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull())) {
|
||||
context.addTableColumn(table, mvReferenceColumn);
|
||||
context.setPctPossible(!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull());
|
||||
}
|
||||
return visit(relation, context);
|
||||
}
|
||||
@ -357,7 +365,6 @@ public class MaterializedViewUtils {
|
||||
IncrementCheckerContext context) {
|
||||
Set<Expression> groupByExprSet = new HashSet<>(aggregate.getGroupByExpressions());
|
||||
if (groupByExprSet.isEmpty()) {
|
||||
context.setPctPossible(false);
|
||||
return null;
|
||||
}
|
||||
Set<Column> originalGroupbyExprSet = new HashSet<>();
|
||||
@ -367,7 +374,6 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
});
|
||||
if (!originalGroupbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
|
||||
context.setPctPossible(false);
|
||||
return null;
|
||||
}
|
||||
return visit(aggregate, context);
|
||||
@ -379,15 +385,16 @@ public class MaterializedViewUtils {
|
||||
if (windowExpressions.isEmpty()) {
|
||||
return visit(window, context);
|
||||
}
|
||||
windowExpressions.forEach(expr -> checkWindowPartition(expr, context));
|
||||
for (NamedExpression namedExpression : windowExpressions) {
|
||||
if (!checkWindowPartition(namedExpression, context)) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return super.visitLogicalWindow(window, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visit(Plan plan, IncrementCheckerContext context) {
|
||||
if (!context.isPctPossible()) {
|
||||
return null;
|
||||
}
|
||||
if (plan instanceof LogicalProject
|
||||
|| plan instanceof LogicalFilter
|
||||
|| plan instanceof LogicalJoin
|
||||
@ -397,65 +404,58 @@ public class MaterializedViewUtils {
|
||||
|| plan instanceof LogicalWindow) {
|
||||
return super.visit(plan, context);
|
||||
}
|
||||
context.setPctPossible(false);
|
||||
return null;
|
||||
}
|
||||
|
||||
private void checkWindowPartition(Expression expression, IncrementCheckerContext context) {
|
||||
expression.collectToList(expressionTreeNode -> expressionTreeNode instanceof WindowExpression)
|
||||
.forEach(windowObj -> {
|
||||
WindowExpression windowExpression = (WindowExpression) windowObj;
|
||||
List<Expression> partitionKeys = windowExpression.getPartitionKeys();
|
||||
Set<Column> originalPartitionbyExprSet = new HashSet<>();
|
||||
partitionKeys.forEach(groupExpr -> {
|
||||
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
|
||||
originalPartitionbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
|
||||
}
|
||||
});
|
||||
if (!originalPartitionbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
|
||||
context.setPctPossible(false);
|
||||
}
|
||||
});
|
||||
private boolean checkWindowPartition(Expression expression, IncrementCheckerContext context) {
|
||||
List<Object> windowExpressions =
|
||||
expression.collectToList(expressionTreeNode -> expressionTreeNode instanceof WindowExpression);
|
||||
for (Object windowExpressionObj : windowExpressions) {
|
||||
WindowExpression windowExpression = (WindowExpression) windowExpressionObj;
|
||||
List<Expression> partitionKeys = windowExpression.getPartitionKeys();
|
||||
Set<Column> originalPartitionbyExprSet = new HashSet<>();
|
||||
partitionKeys.forEach(groupExpr -> {
|
||||
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
|
||||
originalPartitionbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
|
||||
}
|
||||
});
|
||||
if (!originalPartitionbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class IncrementCheckerContext {
|
||||
private final SlotReference mvPartitionColumn;
|
||||
private boolean pctPossible = true;
|
||||
private final List<Pair<TableIf, Column>> partitionRelatedTableAndColumnList = new ArrayList<>();
|
||||
// This record the table id and relation mapping, because a table maybe used repeatedly.
|
||||
private final Multimap<Long, LogicalRelation> tableIdAndRelationMapping = HashMultimap.create();
|
||||
private final Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap;
|
||||
private final Multimap<TableIf, Column> partitionRelatedTableAndColumnMap = HashMultimap.create();
|
||||
|
||||
public IncrementCheckerContext(SlotReference mvPartitionColumn) {
|
||||
public IncrementCheckerContext(SlotReference mvPartitionColumn,
|
||||
Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap) {
|
||||
this.mvPartitionColumn = mvPartitionColumn;
|
||||
this.tableAndCatalogRelationMap = tableAndCatalogRelationMap;
|
||||
}
|
||||
|
||||
public SlotReference getMvPartitionColumn() {
|
||||
return mvPartitionColumn;
|
||||
}
|
||||
|
||||
public boolean isPctPossible() {
|
||||
return pctPossible;
|
||||
}
|
||||
|
||||
public void setPctPossible(boolean pctPossible) {
|
||||
this.pctPossible = pctPossible;
|
||||
}
|
||||
|
||||
public void addTableColumn(TableIf relatedTable, Column partitionColumn) {
|
||||
partitionRelatedTableAndColumnList.add(Pair.of(relatedTable, partitionColumn));
|
||||
partitionRelatedTableAndColumnMap.put(relatedTable, partitionColumn);
|
||||
}
|
||||
|
||||
public List<Pair<TableIf, Column>> getPartitionRelatedTableAndColumnList() {
|
||||
return partitionRelatedTableAndColumnList;
|
||||
public Multimap<TableIf, Column> getPartitionRelatedTableAndColumnMap() {
|
||||
return partitionRelatedTableAndColumnMap;
|
||||
}
|
||||
|
||||
public Multimap<Long, LogicalRelation> getTableIdAndRelationMapping() {
|
||||
return tableIdAndRelationMapping;
|
||||
public Collection<CatalogRelation> getRelationByTable(TableIf tableIf) {
|
||||
return tableAndCatalogRelationMap.get(new TableIdentifier(tableIf));
|
||||
}
|
||||
|
||||
public void addTableIdAndRelation(Long tableId, LogicalRelation relation) {
|
||||
tableIdAndRelationMapping.put(tableId, relation);
|
||||
public void addTableAndRelation(TableIf tableIf, CatalogRelation relation) {
|
||||
tableAndCatalogRelationMap.put(new TableIdentifier(tableIf), relation);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user