[feature](mtmv) create mtmv support partitions rollup (#31812)

if create MTMV `date_trunc(`xxx`,'month')`
when related table is `range` partition,and have 3 partitions:
```
20200101-20200102
20200102-20200103
20200201-20200202
```
then MTMV will have 2 partitions:
```
20200101-20200201
20200201-20200301
```

when related table is `list` partition,and have 3 partitions:
```
(20200101,20200102)
(20200103)
(20200201)
```
then MTMV will have 2 partitions:
```
(20200101,20200102,20200103)
(20200201)
```
This commit is contained in:
zhangdong
2024-05-14 10:46:29 +08:00
committed by yiguolei
parent f7801948ad
commit 30a036e7a4
26 changed files with 1808 additions and 382 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
@ -38,7 +39,6 @@ import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
@ -51,7 +51,6 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@ -313,35 +312,6 @@ public class MTMV extends OlapTable {
return result;
}
/**
* generateRelatedPartitionDescs
* <p>
* Different partitions may generate the same PartitionKeyDesc through logical calculations
* (such as selecting only one column, or rolling up partitions), so it is a one to many relationship
*
* @return related PartitionKeyDesc ==> relatedPartitionIds
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable()
.getPartitionItemsByTimeFilter(relatedColPos,
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
if (res.containsKey(partitionKeyDesc)) {
res.get(partitionKeyDesc).add(entry.getKey());
} else {
res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey()));
}
}
return res;
}
/**
* Calculate the partition and associated partition mapping relationship of the MTMV
* It is the result of real-time comparison calculation, so there may be some costs,
@ -354,13 +324,19 @@ public class MTMV extends OlapTable {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
long start = System.currentTimeMillis();
Map<Long, Set<Long>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = generateRelatedPartitionDescs();
Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]",
System.currentTimeMillis() - start, name);
}
return res;
}

View File

@ -170,7 +170,7 @@ public class MTMVTask extends AbstractTask {
// Now, the MTMV first ensures consistency with the data in the cache.
// To be completely consistent with hive, you need to manually refresh the cache
// refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings();
@ -225,7 +225,7 @@ public class MTMVTask extends AbstractTask {
lastQueryId = DebugUtil.printId(queryId);
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE
? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
@ -400,7 +400,7 @@ public class MTMVTask extends AbstractTask {
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {
Map<TableIf, String> tableWithPartKey = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
tableWithPartKey
.put(mtmv.getMvPartitionInfo().getRelatedTable(), mtmv.getMvPartitionInfo().getRelatedCol());
}

View File

@ -0,0 +1,228 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.PartitionExprUtil;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeV2Literal;
import org.apache.doris.nereids.trees.expressions.literal.DateV2Literal;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class MTMVPartitionExprDateTrunc implements MTMVPartitionExprService {
private static Set<String> timeUnits = ImmutableSet.of("year", "month", "day");
private String timeUnit;
public MTMVPartitionExprDateTrunc(FunctionCallExpr functionCallExpr) throws AnalysisException {
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
if (paramsExprs.size() != 2) {
throw new AnalysisException("date_trunc params exprs size should be 2.");
}
Expr param = paramsExprs.get(1);
if (!(param instanceof StringLiteral)) {
throw new AnalysisException("date_trunc param of time unit is not string literal.");
}
this.timeUnit = param.getStringValue().toLowerCase();
}
@Override
public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
if (!timeUnits.contains(this.timeUnit)) {
throw new AnalysisException(
String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits));
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType();
if (partitionType == PartitionType.RANGE) {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
if (!partitionColumnType.isDateType()) {
throw new AnalysisException(
"partitionColumnType should be date/datetime "
+ "when PartitionType is range and expr is date_trunc");
}
}
}
@Override
public String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map<String, String> mvProperties)
throws AnalysisException {
String res = null;
Optional<String> dateFormat = getDateFormat(mvProperties);
List<List<PartitionValue>> inValues = partitionKeyDesc.getInValues();
for (int i = 0; i < inValues.size(); i++) {
// mtmv only support one partition column
PartitionValue partitionValue = inValues.get(i).get(0);
if (partitionValue.isNullPartition()) {
throw new AnalysisException("date trunc not support null partition value");
}
String identity = dateTrunc(partitionValue.getStringValue(), dateFormat, false).toString();
if (i == 0) {
res = identity;
} else {
if (!Objects.equals(res, identity)) {
throw new AnalysisException(
String.format("partition values not equal, res: %s, identity: %s", res,
identity));
}
}
}
return res;
}
private Optional<String> getDateFormat(Map<String, String> mvProperties) {
Optional<String> dateFormat =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
? Optional.empty()
: Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
return dateFormat;
}
@Override
public PartitionKeyDesc generateRollUpPartitionKeyDesc(PartitionKeyDesc partitionKeyDesc,
MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
// mtmv only support one partition column
Preconditions.checkState(partitionKeyDesc.getLowerValues().size() == 1,
"only support one partition column");
DateTimeV2Literal beginTime = dateTrunc(
partitionKeyDesc.getLowerValues().get(0).getStringValue(),
Optional.empty(), false);
PartitionValue lowerValue = new PartitionValue(dateTimeToStr(beginTime, partitionColumnType));
PartitionValue upperValue = getUpperValue(partitionKeyDesc.getUpperValues().get(0), beginTime,
partitionColumnType);
return PartitionKeyDesc.createFixed(
Collections.singletonList(lowerValue),
Collections.singletonList(upperValue));
}
private PartitionValue getUpperValue(PartitionValue upperValue, DateTimeV2Literal beginTruncTime,
Type partitionColumnType) throws AnalysisException {
if (upperValue.isMax()) {
throw new AnalysisException("date trunc not support MAXVALUE partition");
}
// begin time and end time dateTrunc should has same result
DateTimeV2Literal endTruncTime = dateTrunc(upperValue.getStringValue(), Optional.empty(), true);
if (!Objects.equals(beginTruncTime, endTruncTime)) {
throw new AnalysisException(
String.format("partition values not equal, beginTruncTime: %s, endTruncTime: %s", beginTruncTime,
endTruncTime));
}
DateTimeV2Literal endTime = dateIncrement(beginTruncTime);
return new PartitionValue(dateTimeToStr(endTime, partitionColumnType));
}
private DateTimeV2Literal dateTrunc(String value,
Optional<String> dateFormat, boolean isUpper) throws AnalysisException {
DateTimeV2Literal dateTimeLiteral = strToDate(value, dateFormat);
// for (2020-01-31,2020-02-01),if not -1, lower value and upper value will not same after rollup
if (isUpper) {
dateTimeLiteral = (DateTimeV2Literal) DateTimeArithmetic.secondsSub(dateTimeLiteral, new IntegerLiteral(1));
}
Expression expression = DateTimeExtractAndTransform.dateTrunc(dateTimeLiteral, new VarcharLiteral(timeUnit));
if (!(expression instanceof DateTimeV2Literal)) {
throw new AnalysisException("dateTrunc() should return DateLiteral, expression: " + expression);
}
return (DateTimeV2Literal) expression;
}
private DateTimeV2Literal strToDate(String value,
Optional<String> dateFormat) throws AnalysisException {
try {
return new DateTimeV2Literal(value);
} catch (Exception e) {
if (!dateFormat.isPresent()) {
throw e;
}
Expression strToDate = DateTimeExtractAndTransform
.strToDate(new VarcharLiteral(value),
new VarcharLiteral(dateFormat.get()));
if (strToDate instanceof DateV2Literal) {
DateV2Literal dateV2Literal = (DateV2Literal) strToDate;
return new DateTimeV2Literal(dateV2Literal.getYear(), dateV2Literal.getMonth(), dateV2Literal.getDay(),
0, 0, 0);
} else if (strToDate instanceof DateTimeV2Literal) {
return (DateTimeV2Literal) strToDate;
} else {
throw new AnalysisException(
String.format("strToDate failed, stringValue: %s, dateFormat: %s", value,
dateFormat));
}
}
}
private DateTimeV2Literal dateIncrement(DateTimeV2Literal value) throws AnalysisException {
Expression result;
switch (timeUnit) {
case "year":
result = value.plusYears(1L);
break;
case "month":
result = value.plusMonths(1L);
break;
case "day":
result = value.plusDays(1L);
break;
default:
throw new AnalysisException("MTMV partition roll up not support timeUnit: " + timeUnit);
}
if (!(result instanceof DateTimeV2Literal)) {
throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result);
}
return (DateTimeV2Literal) result;
}
private String dateTimeToStr(DateTimeV2Literal literal,
Type partitionColumnType) throws AnalysisException {
if (partitionColumnType.isDate() || partitionColumnType.isDateV2()) {
return String.format(PartitionExprUtil.DATE_FORMATTER, literal.getYear(), literal.getMonth(),
literal.getDay());
} else if (partitionColumnType.isDatetime() || partitionColumnType.isDatetimeV2()) {
return String.format(PartitionExprUtil.DATETIME_FORMATTER,
literal.getYear(), literal.getMonth(), literal.getDay(),
literal.getHour(), literal.getMinute(), literal.getSecond());
} else {
throw new AnalysisException(
"MTMV not support partition with column type : " + partitionColumnType);
}
}
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.common.AnalysisException;
/**
* MTMV Partition Expr Factory
*/
public class MTMVPartitionExprFactory {
public static MTMVPartitionExprService getExprService(Expr expr) throws AnalysisException {
if (!(expr instanceof FunctionCallExpr)) {
throw new AnalysisException("now mtmv partition only support FunctionCallExpr");
}
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
String fnName = functionCallExpr.getFnName().getFunction().toLowerCase();
if ("date_trunc".equals(fnName)) {
return new MTMVPartitionExprDateTrunc(functionCallExpr);
}
throw new AnalysisException("MTMV partition not support function name: " + fnName);
}
}

View File

@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.AnalysisException;
import java.util.Map;
/**
* Interface for materialized view partitioning function
*/
public interface MTMVPartitionExprService {
/**
* for list partition, get identity by expr
*
* @param partitionKeyDesc
* @param mvProperties
* @return
* @throws AnalysisException
*/
String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map<String, String> mvProperties)
throws AnalysisException;
/**
* for range partition, get roll up PartitionKeyDesc by expr
*
* @param partitionKeyDesc
* @param mvPartitionInfo
* @return
* @throws AnalysisException
*/
PartitionKeyDesc generateRollUpPartitionKeyDesc(
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo)
throws AnalysisException;
/**
* Check if user input is legal
*
* @param mtmvPartitionInfo
* @throws AnalysisException
*/
void analyze(MTMVPartitionInfo mtmvPartitionInfo) throws AnalysisException;
}

View File

@ -17,10 +17,16 @@
package org.apache.doris.mtmv;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.gson.annotations.SerializedName;
import java.util.List;
/**
* MTMVPartitionInfo
*/
@ -28,17 +34,24 @@ public class MTMVPartitionInfo {
public enum MTMVPartitionType {
FOLLOW_BASE_TABLE,
EXPR,
SELF_MANAGE
}
public static final ImmutableSet<String> MTMV_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder<String>(
String.CASE_INSENSITIVE_ORDER).add("date_trunc")
.build();
@SerializedName("pt")
MTMVPartitionType partitionType;
private MTMVPartitionType partitionType;
@SerializedName("rt")
BaseTableInfo relatedTable;
private BaseTableInfo relatedTable;
@SerializedName("rc")
String relatedCol;
private String relatedCol;
@SerializedName("pc")
String partitionCol;
private String partitionCol;
@SerializedName("expr")
private Expr expr;
public MTMVPartitionInfo() {
}
@ -89,6 +102,14 @@ public class MTMVPartitionInfo {
this.partitionCol = partitionCol;
}
public Expr getExpr() {
return expr;
}
public void setExpr(Expr expr) {
this.expr = expr;
}
/**
* Get the position of relatedCol in the relatedTable partition column
*
@ -99,7 +120,15 @@ public class MTMVPartitionInfo {
if (partitionType == MTMVPartitionType.SELF_MANAGE) {
throw new AnalysisException("partitionType is: " + partitionType);
}
return MTMVPartitionUtil.getPos(getRelatedTable(), relatedCol);
List<Column> partitionColumns = getRelatedTable().getPartitionColumns();
for (int i = 0; i < partitionColumns.size(); i++) {
if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) {
return i;
}
}
throw new AnalysisException(
String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol,
partitionColumns));
}
// toString() is not easy to find where to call the method

View File

@ -27,12 +27,13 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -56,6 +57,18 @@ public class MTMVPartitionUtil {
private static final Pattern PARTITION_NAME_PATTERN = Pattern.compile("[^a-zA-Z0-9,]");
private static final String PARTITION_NAME_PREFIX = "p_";
private static final List<MTMVRelatedPartitionDescGeneratorService> partitionDescGenerators = ImmutableList
.of(
// It is necessary to maintain this order,
// because some impl deal `PartitionItem`, and some impl deal `PartitionDesc`
// for example: if `MTMVRelatedPartitionDescOnePartitionColGenerator` not generate `PartitionDesc`,
// `MTMVRelatedPartitionDescRollUpGenerator` will not have parameter
new MTMVRelatedPartitionDescInitGenerator(),
new MTMVRelatedPartitionDescSyncLimitGenerator(),
new MTMVRelatedPartitionDescOnePartitionColGenerator(),
new MTMVRelatedPartitionDescRollUpGenerator()
);
/**
* Determine whether the partition is sync with retated partition and other baseTables
*
@ -71,7 +84,7 @@ public class MTMVPartitionUtil {
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
// if follow base table, not need compare with related table, only should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
@ -96,7 +109,8 @@ public class MTMVPartitionUtil {
public static void alignMvPartition(MTMV mtmv)
throws DdlException, AnalysisException {
Map<Long, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Set<PartitionKeyDesc> relatedPartitionDescs = mtmv.generateRelatedPartitionDescs().keySet();
Set<PartitionKeyDesc> relatedPartitionDescs = generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(),
mtmv.getMvProperties()).keySet();
// drop partition of mtmv
for (Entry<Long, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
if (!relatedPartitionDescs.contains(entry.getValue())) {
@ -115,19 +129,18 @@ public class MTMVPartitionUtil {
/**
* getPartitionDescsByRelatedTable when create MTMV
*
* @param relatedTable
* @param tableProperties
* @param relatedCol
* @param mvPartitionInfo
* @return
* @throws AnalysisException
*/
public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable,
Map<String, String> tableProperties, String relatedCol, Map<String, String> mvProperties)
public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(
Map<String, String> tableProperties, MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties)
throws AnalysisException {
HashMap<String, String> partitionProperties = Maps.newHashMap();
List<AllPartitionDesc> res = Lists.newArrayList();
Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol,
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
HashMap<String, String> partitionProperties = Maps.newHashMap();
Set<PartitionKeyDesc> relatedPartitionDescs = generateRelatedPartitionDescs(mvPartitionInfo, mvProperties)
.keySet();
for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) {
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
generatePartitionName(partitionKeyDesc),
@ -139,28 +152,18 @@ public class MTMVPartitionUtil {
return res;
}
private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol,
MTMVPartitionSyncConfig config)
throws AnalysisException {
int pos = getPos(relatedTable, relatedCol);
Set<PartitionKeyDesc> res = Sets.newHashSet();
for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItemsByTimeFilter(pos, config).entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos);
res.add(partitionKeyDesc);
public static Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo,
Map<String, String> mvProperties) throws AnalysisException {
long start = System.currentTimeMillis();
RelatedPartitionDescResult result = new RelatedPartitionDescResult();
for (MTMVRelatedPartitionDescGeneratorService service : partitionDescGenerators) {
service.apply(mvPartitionInfo, mvProperties, result);
}
return res;
}
public static int getPos(MTMVRelatedTableIf relatedTable, String relatedCol) throws AnalysisException {
List<Column> partitionColumns = relatedTable.getPartitionColumns();
for (int i = 0; i < partitionColumns.size(); i++) {
if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) {
return i;
}
if (LOG.isDebugEnabled()) {
LOG.debug("generateRelatedPartitionDescs use [{}] mills, mvPartitionInfo is [{}]",
System.currentTimeMillis() - start, mvPartitionInfo);
}
throw new AnalysisException(
String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol,
partitionColumns));
return result.getDescs();
}
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
@ -258,7 +261,7 @@ public class MTMVPartitionUtil {
if (!mtmvRelatedTableIf.needAutoRefresh()) {
continue;
}
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
if (CollectionUtils.isEmpty(relatedPartitionIds)) {
throw new AnalysisException("can not found related partition");
@ -469,7 +472,7 @@ public class MTMVPartitionUtil {
Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds)
throws AnalysisException {
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (Long relatedPartitionId : relatedPartitionIds) {
MTMVSnapshotIf partitionSnapshot = relatedTable
@ -479,7 +482,7 @@ public class MTMVPartitionUtil {
}
}
for (BaseTableInfo baseTableInfo : baseTables) {
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
continue;
}
@ -491,4 +494,14 @@ public class MTMVPartitionUtil {
}
return refreshPartitionSnapshot;
}
public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException {
List<Column> partitionColumns = relatedTable.getPartitionColumns();
for (Column column : partitionColumns) {
if (column.getName().equals(col)) {
return column.getType();
}
}
throw new AnalysisException("can not getPartitionColumnType by:" + col);
}
}

View File

@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.common.AnalysisException;
import java.util.Map;
/**
* Interface for a series of processes to generate PartitionDesc
*/
public interface MTMVRelatedPartitionDescGeneratorService {
/**
* generate related table PartitionDesc
*
* @param mvPartitionInfo PartitionInfo of MTMV
* @param mvProperties properties of MTMV
* @param lastResult the processing result of the previous process
* @throws AnalysisException
*/
void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException;
}

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.common.AnalysisException;
import java.util.Map;
/**
* get all related partition descs
*/
public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartitionDescGeneratorService {
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems());
}
}

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* For example, the related table is partitioned by `date` and `region`, with the following 6 partitions
* <p>
* 20200101 beijing
* 20200101 shanghai
* 20200102 beijing
* 20200102 shanghai
* 20200103 beijing
* 20200103 shanghai
* <p>
* If the MTMV is partitioned by `date`, then the MTMV will have three partitions: 20200101, 202000102, 20200103
* <p>
* If the MTMV is partitioned by `region`, then the MTMV will have two partitions: beijing, shanghai
*/
public class MTMVRelatedPartitionDescOnePartitionColGenerator implements MTMVRelatedPartitionDescGeneratorService {
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return;
}
Map<PartitionKeyDesc, Set<Long>> res = Maps.newHashMap();
Map<Long, PartitionItem> relatedPartitionItems = lastResult.getItems();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
if (res.containsKey(partitionKeyDesc)) {
res.get(partitionKeyDesc).add(entry.getKey());
} else {
res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey()));
}
}
lastResult.setDescs(res);
}
}

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Roll up some partitions into one partition
*/
public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedPartitionDescGeneratorService {
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
if (mvPartitionInfo.getPartitionType() != MTMVPartitionType.EXPR) {
return;
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType();
if (partitionType == PartitionType.RANGE) {
lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo));
} else if (partitionType == PartitionType.LIST) {
lastResult.setDescs(rollUpList(lastResult.getDescs(), mvPartitionInfo, mvProperties));
} else {
throw new AnalysisException("only RANGE/LIST partition support roll up");
}
}
/**
* when related table has 3 partitions:(20200101),(20200102),(20200201)
* <p>
* if expr is `date_trunc(month)`
* then,MTMV will have 2 partitions (20200101,20200102),(20200201)
* <p>
* if expr is `date_trunc(year)`
* then,MTMV will have 1 partitions (20200101,20200102,20200201)
*
* @param relatedPartitionDescs
* @param mvPartitionInfo
* @return
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> rollUpList(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs,
MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties) throws AnalysisException {
Map<String, Set<String>> identityToValues = Maps.newHashMap();
Map<String, Set<Long>> identityToPartitionIds = Maps.newHashMap();
MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr());
for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) {
String rollUpIdentity = exprSerice.getRollUpIdentity(entry.getKey(), mvProperties);
Preconditions.checkNotNull(rollUpIdentity);
if (identityToValues.containsKey(rollUpIdentity)) {
identityToValues.get(rollUpIdentity).addAll(getStringValues(entry.getKey()));
identityToPartitionIds.get(rollUpIdentity).addAll(entry.getValue());
} else {
identityToValues.put(rollUpIdentity, getStringValues(entry.getKey()));
identityToPartitionIds.put(rollUpIdentity, entry.getValue());
}
}
Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap();
for (Entry<String, Set<String>> entry : identityToValues.entrySet()) {
result.put(PartitionKeyDesc.createIn(getPartitionValues(entry.getValue())),
identityToPartitionIds.get(entry.getKey()));
}
return result;
}
private List<List<PartitionValue>> getPartitionValues(Set<String> strings) {
List<List<PartitionValue>> inValues = Lists.newArrayList();
for (String value : strings) {
inValues.add(Lists.newArrayList(new PartitionValue(value)));
}
return inValues;
}
private Set<String> getStringValues(PartitionKeyDesc partitionKeyDesc) {
List<List<PartitionValue>> inValues = partitionKeyDesc.getInValues();
Set<String> res = Sets.newHashSet();
for (List<PartitionValue> list : inValues) {
res.add(list.get(0).getStringValue());
}
return res;
}
/**
* when related table has 3 partitions:(20200101-20200102),(20200102-20200103),(20200201-20200202)
* <p>
* if expr is `date_trunc(month)`
* then,MTMV will have 2 partitions (20200101-20200201),(20200101-20200301)
* <p>
* if expr is `date_trunc(year)`
* then,MTMV will have 1 partitions (20200101-20210101)
*
* @param relatedPartitionDescs
* @param mvPartitionInfo
* @return
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> rollUpRange(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs,
MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap();
MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr());
for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) {
PartitionKeyDesc rollUpDesc = exprSerice.generateRollUpPartitionKeyDesc(entry.getKey(), mvPartitionInfo);
if (result.containsKey(rollUpDesc)) {
result.get(rollUpDesc).addAll(entry.getValue());
} else {
result.put(rollUpDesc, entry.getValue());
}
}
return result;
}
}

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
/**
* Only focus on partial partitions of related tables
*/
public class MTMVRelatedPartitionDescSyncLimitGenerator implements MTMVRelatedPartitionDescGeneratorService {
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
Map<Long, PartitionItem> partitionItems = lastResult.getItems();
MTMVPartitionSyncConfig config = generateMTMVPartitionSyncConfigByProperties(mvProperties);
if (config.getSyncLimit() <= 0) {
return;
}
long nowTruncSubSec = getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit());
Optional<String> dateFormat = config.getDateFormat();
Map<Long, PartitionItem> res = Maps.newHashMap();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) {
if (entry.getValue().isGreaterThanSpecifiedTime(relatedColPos, dateFormat, nowTruncSubSec)) {
res.put(entry.getKey(), entry.getValue());
}
}
lastResult.setItems(res);
}
/**
* Generate MTMVPartitionSyncConfig based on mvProperties
*
* @param mvProperties
* @return
*/
public MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties(
Map<String, String> mvProperties) {
int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1
: Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT));
MTMVPartitionSyncTimeUnit timeUnit =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT))
? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit
.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase());
Optional<String> dateFormat =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
? Optional.empty()
: Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat);
}
/**
* Obtain the minimum second from `syncLimit` `timeUnit` ago
*
* @param timeUnit
* @param syncLimit
* @return
* @throws AnalysisException
*/
public long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit)
throws AnalysisException {
if (syncLimit < 1) {
throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit);
}
// get current time
Expression now = DateTimeAcquire.now();
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("now() should return DateTimeLiteral, now: " + now);
}
DateTimeLiteral nowLiteral = (DateTimeLiteral) now;
// date trunc
now = DateTimeExtractAndTransform
.dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name()));
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now);
}
nowLiteral = (DateTimeLiteral) now;
// date sub
if (syncLimit > 1) {
nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1);
}
return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue();
}
private DateTimeLiteral dateSub(
org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit,
int num)
throws AnalysisException {
IntegerLiteral integerLiteral = new IntegerLiteral(num);
Expression result;
switch (timeUnit) {
case DAY:
result = DateTimeArithmetic.dateSub(date, integerLiteral);
break;
case YEAR:
result = DateTimeArithmetic.yearsSub(date, integerLiteral);
break;
case MONTH:
result = DateTimeArithmetic.monthsSub(date, integerLiteral);
break;
default:
throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name());
}
if (!(result instanceof DateTimeLiteral)) {
throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result);
}
return (DateTimeLiteral) result;
}
}

View File

@ -24,12 +24,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
/**
@ -44,31 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf {
*/
Map<Long, PartitionItem> getAndCopyPartitionItems();
/**
* Obtain a list of partitions filtered by time
*
* @param pos The position of the partition column to be checked in all partition columns
* @param config
* @return
* @throws AnalysisException
*/
default Map<Long, PartitionItem> getPartitionItemsByTimeFilter(int pos, MTMVPartitionSyncConfig config)
throws AnalysisException {
Map<Long, PartitionItem> partitionItems = getAndCopyPartitionItems();
if (config.getSyncLimit() <= 0) {
return partitionItems;
}
long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit());
Optional<String> dateFormat = config.getDateFormat();
Map<Long, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) {
if (entry.getValue().isGreaterThanSpecifiedTime(pos, dateFormat, nowTruncSubSec)) {
res.put(entry.getKey(), entry.getValue());
}
}
return res;
}
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
*

View File

@ -25,22 +25,15 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeV2Literal;
import org.apache.doris.nereids.trees.expressions.literal.DateV2Literal;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -61,6 +54,20 @@ public class MTMVUtil {
return table;
}
public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) {
TableIf relatedTable = null;
try {
relatedTable = MTMVUtil.getTable(baseTableInfo);
} catch (org.apache.doris.common.AnalysisException e) {
throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e);
}
if (!(relatedTable instanceof MTMVRelatedTableIf)) {
throw new org.apache.doris.nereids.exceptions.AnalysisException(
"base table for partitioning only can be OlapTable or HMSTable");
}
return (MTMVRelatedTableIf) relatedTable;
}
public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
@ -82,64 +89,6 @@ public class MTMVUtil {
return false;
}
/**
* Obtain the minimum second from `syncLimit` `timeUnit` ago
*
* @param timeUnit
* @param syncLimit
* @return
* @throws AnalysisException
*/
public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit)
throws AnalysisException {
if (syncLimit < 1) {
throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit);
}
// get current time
Expression now = DateTimeAcquire.now();
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("now() should return DateTimeLiteral, now: " + now);
}
DateTimeLiteral nowLiteral = (DateTimeLiteral) now;
// date trunc
now = DateTimeExtractAndTransform
.dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name()));
if (!(now instanceof DateTimeLiteral)) {
throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now);
}
nowLiteral = (DateTimeLiteral) now;
// date sub
if (syncLimit > 1) {
nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1);
}
return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue();
}
private static DateTimeLiteral dateSub(
org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit,
int num)
throws AnalysisException {
IntegerLiteral integerLiteral = new IntegerLiteral(num);
Expression result;
switch (timeUnit) {
case DAY:
result = DateTimeArithmetic.dateSub(date, integerLiteral);
break;
case YEAR:
result = DateTimeArithmetic.yearsSub(date, integerLiteral);
break;
case MONTH:
result = DateTimeArithmetic.monthsSub(date, integerLiteral);
break;
default:
throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name());
}
if (!(result instanceof DateTimeLiteral)) {
throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result);
}
return (DateTimeLiteral) result;
}
/**
* Convert LiteralExpr to second
*
@ -177,25 +126,4 @@ public class MTMVUtil {
expr.getStringValue(), dateFormat));
}
}
/**
* Generate MTMVPartitionSyncConfig based on mvProperties
*
* @param mvProperties
* @return
*/
public static MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties(
Map<String, String> mvProperties) {
int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1
: Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT));
MTMVPartitionSyncTimeUnit timeUnit =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT))
? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit
.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase());
Optional<String> dateFormat =
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
? Optional.empty()
: Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat);
}
}

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.PartitionItem;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Set;
public class RelatedPartitionDescResult {
// PartitionKeyDesc to relatedTable partition ids(Different partitions may have the same PartitionKeyDesc)
private Map<PartitionKeyDesc, Set<Long>> descs;
private Map<Long, PartitionItem> items;
public RelatedPartitionDescResult() {
this.descs = Maps.newHashMap();
this.items = Maps.newHashMap();
}
public Map<PartitionKeyDesc, Set<Long>> getDescs() {
return descs;
}
public void setDescs(Map<PartitionKeyDesc, Set<Long>> descs) {
this.descs = descs;
}
public Map<Long, PartitionItem> getItems() {
return items;
}
public void setItems(Map<Long, PartitionItem> items) {
this.items = items;
}
}

View File

@ -33,7 +33,6 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
@ -127,6 +126,7 @@ import org.apache.doris.nereids.DorisParser.LogicalNotContext;
import org.apache.doris.nereids.DorisParser.MapLiteralContext;
import org.apache.doris.nereids.DorisParser.MultiStatementsContext;
import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
import org.apache.doris.nereids.DorisParser.MvPartitionContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
import org.apache.doris.nereids.DorisParser.NullLiteralContext;
@ -406,6 +406,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.FuncNameInfo;
import org.apache.doris.nereids.trees.plans.commands.info.InPartition;
import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
import org.apache.doris.nereids.trees.plans.commands.info.MTMVPartitionDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
@ -635,22 +636,29 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
desc, properties, logicalPlan, querySql,
new MTMVRefreshInfo(buildMode, refreshMethod, refreshTriggerInfo),
ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols),
visitMTMVPartitionInfo(ctx.partitionKey)
visitMTMVPartitionInfo(ctx.mvPartition())
));
}
/**
* get MTMVPartitionInfo
* get MTMVPartitionDefinition
*
* @param ctx IdentifierContext
* @return MTMVPartitionInfo
* @param ctx MvPartitionContext
* @return MTMVPartitionDefinition
*/
public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) {
public MTMVPartitionDefinition visitMTMVPartitionInfo(MvPartitionContext ctx) {
MTMVPartitionDefinition mtmvPartitionDefinition = new MTMVPartitionDefinition();
if (ctx == null) {
return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE);
mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.SELF_MANAGE);
} else if (ctx.partitionKey != null) {
mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.FOLLOW_BASE_TABLE);
mtmvPartitionDefinition.setPartitionCol(ctx.partitionKey.getText());
} else {
return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE, ctx.getText());
mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.EXPR);
Expression functionCallExpression = visitFunctionCallExpression(ctx.partitionExpr);
mtmvPartitionDefinition.setFunctionCallExpression(functionCallExpression);
}
return mtmvPartitionDefinition;
}
@Override

View File

@ -30,10 +30,8 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
@ -45,14 +43,12 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
@ -78,7 +74,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -103,9 +98,10 @@ public class CreateMTMVInfo {
private final List<ColumnDefinition> columns = Lists.newArrayList();
private final List<SimpleColumnDefinition> simpleColumnDefinitions;
private final EnvInfo envInfo;
private final MTMVPartitionInfo mvPartitionInfo;
private final MTMVPartitionDefinition mvPartitionDefinition;
private PartitionDesc partitionDesc;
private MTMVRelation relation;
private MTMVPartitionInfo mvPartitionInfo;
/**
* constructor for create MTMV
@ -116,7 +112,7 @@ public class CreateMTMVInfo {
LogicalPlan logicalQuery, String querySql,
MTMVRefreshInfo refreshInfo,
List<SimpleColumnDefinition> simpleColumnDefinitions,
MTMVPartitionInfo mvPartitionInfo) {
MTMVPartitionDefinition mvPartitionDefinition) {
this.ifNotExists = Objects.requireNonNull(ifNotExists, "require ifNotExists object");
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
this.keys = Utils.copyRequiredList(keys);
@ -130,8 +126,8 @@ public class CreateMTMVInfo {
.requireNonNull(simpleColumnDefinitions, "require simpleColumnDefinitions object");
this.envInfo = new EnvInfo(ConnectContext.get().getCurrentCatalog().getId(),
ConnectContext.get().getCurrentDbId());
this.mvPartitionInfo = Objects
.requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo object");
this.mvPartitionDefinition = Objects
.requireNonNull(mvPartitionDefinition, "require mtmvPartitionInfo object");
}
/**
@ -212,7 +208,9 @@ public class CreateMTMVInfo {
}
getRelation(planner);
getColumns(plan);
analyzePartition(planner, ctx);
this.mvPartitionInfo = mvPartitionDefinition
.analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery);
this.partitionDesc = generatePartitionDesc(ctx);
}
private void getRelation(NereidsPlanner planner) {
@ -235,67 +233,17 @@ public class CreateMTMVInfo {
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
}
private void analyzePartition(NereidsPlanner planner, ConnectContext ctx) {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan);
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable base table for partitioning");
}
TableIf relatedTable = null;
try {
relatedTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!(relatedTable instanceof MTMVRelatedTableIf)) {
throw new AnalysisException("base table for partitioning only can be OlapTable or HMSTable");
}
MTMVRelatedTableIf mtmvBaseRealtedTable = (MTMVRelatedTableIf) relatedTable;
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
partitionDesc = generatePartitionDesc(mtmvBaseRealtedTable, ctx);
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return null;
}
}
private PartitionDesc generatePartitionDesc(MTMVRelatedTableIf relatedTable, ConnectContext ctx) {
MTMVRelatedTableIf relatedTable = MTMVUtil.getRelatedTable(mvPartitionInfo.getRelatedTableInfo());
List<AllPartitionDesc> allPartitionDescs = null;
try {
allPartitionDescs = MTMVPartitionUtil
.getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol(),
mvProperties);
.getPartitionDescsByRelatedTable(properties, mvPartitionInfo, mvProperties);
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("getPartitionDescsByRelatedTable failed", e);
throw new AnalysisException(e.getMessage(), e);
}
if (allPartitionDescs.size() > ctx.getSessionVariable().getCreateTablePartitionMaxNum()) {
throw new AnalysisException(String.format(
@ -316,7 +264,7 @@ public class CreateMTMVInfo {
return null;
}
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("can not generate partitionDesc", e);
throw new AnalysisException(e.getMessage(), e);
}
}
@ -333,7 +281,7 @@ public class CreateMTMVInfo {
throw new AnalysisException("can not contain VIEW");
}
} catch (org.apache.doris.common.AnalysisException e) {
LOG.warn("can not get table, ", e);
LOG.warn(e.getMessage(), e);
}
}
}
@ -362,7 +310,7 @@ public class CreateMTMVInfo {
try {
FeNameFormat.checkColumnName(colName);
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage());
throw new AnalysisException(e.getMessage(), e);
}
if (colNames.contains(colName)) {
throw new AnalysisException("repeat cols:" + colName);

View File

@ -0,0 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Expr.java
// and modified by Doris
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.MTMVPartitionExprFactory;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* MTMVPartitionDefinition
*/
public class MTMVPartitionDefinition {
private MTMVPartitionType partitionType;
private String partitionCol;
private Expression functionCallExpression;
/**
* analyzeAndTransferToMTMVPartitionInfo
*
* @param planner planner
* @param ctx ctx
* @param logicalQuery logicalQuery
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx,
LogicalPlan logicalQuery) {
MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType);
if (this.partitionType == MTMVPartitionType.SELF_MANAGE) {
return mtmvPartitionInfo;
}
String partitionColName;
if (this.partitionType == MTMVPartitionType.EXPR) {
Expr expr;
if (functionCallExpression instanceof UnboundFunction) {
UnboundFunction function = (UnboundFunction) functionCallExpression;
expr = new FunctionCallExpr(function.getName(),
new FunctionParams(convertToLegacyArguments(function.children())));
} else {
throw new AnalysisException(
"unsupported auto partition expr " + functionCallExpression.toString());
}
partitionColName = getColNameFromExpr(expr);
mtmvPartitionInfo.setExpr(expr);
} else {
partitionColName = this.partitionCol;
}
mtmvPartitionInfo.setPartitionCol(partitionColName);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName);
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
if (this.partitionType == MTMVPartitionType.EXPR) {
try {
MTMVPartitionExprFactory.getExprService(mtmvPartitionInfo.getExpr()).analyze(mtmvPartitionInfo);
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
}
return mtmvPartitionInfo;
}
/**
* getColNameFromExpr
*
* @param expr expr
* @return String
*/
public static String getColNameFromExpr(Expr expr) {
if (!(expr instanceof FunctionCallExpr)) {
throw new AnalysisException(
"auto create partition only support function call expr is: "
+ MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS);
}
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
List<Expr> paramsExpr = functionCallExpr.getParams().exprs();
String name = functionCallExpr.getFnName().getFunction();
if (MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS.contains(name)) {
for (Expr param : paramsExpr) {
if (param instanceof SlotRef) {
return ((SlotRef) param).getColumnName();
}
}
throw new AnalysisException("can not find colName");
} else {
throw new AnalysisException(
"auto create partition only support function call expr is: "
+ MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS);
}
}
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
logicalQuery,
String partitionColName) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, mvRewrittenPlan);
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable base table for partitioning");
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.get().getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo.get();
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
}
private static List<Expr> convertToLegacyArguments(List<Expression> children) {
return children.stream().map(child -> {
if (child instanceof UnboundSlot) {
return new SlotRef(null, ((UnboundSlot) child).getName());
} else if (child instanceof Literal) {
return new StringLiteral(((Literal) child).getStringValue());
} else {
throw new AnalysisException("unsupported argument " + child.toString());
}
}).collect(Collectors.toList());
}
public MTMVPartitionType getPartitionType() {
return partitionType;
}
public void setPartitionType(MTMVPartitionType partitionType) {
this.partitionType = partitionType;
}
public String getPartitionCol() {
return partitionCol;
}
public void setPartitionCol(String partitionCol) {
this.partitionCol = partitionCol;
}
public Expression getFunctionCallExpression() {
return functionCallExpression;
}
public void setFunctionCallExpression(Expression functionCallExpression) {
this.functionCallExpression = functionCallExpression;
}
}