[enhance](mtmv)MTMV supports Hive multi-level partitioning (#31060)

Issue Number: close #xxx

For example, the hive table is partitioned by `date` and `region`, with the following 6 partitions
```
20200101
        beijing
        shanghai
20200102
        beijing
        shanghai
20200103
        beijing
        shanghai
```

If the MTMV is partitioned by `date`, then the MTMV will have three partitions: 20200101, 202000102, 20200103

If the MTMV is partitioned by `region`, then the MTMV will have two partitions: beijing, shanghai
This commit is contained in:
zhangdong
2024-02-25 17:47:19 +08:00
committed by yiguolei
parent 7a1caf4718
commit 7a9fe5d275
20 changed files with 698 additions and 146 deletions

View File

@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.util.List;
@ -223,4 +224,26 @@ public class PartitionKeyDesc {
})).append(")");
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionKeyDesc that = (PartitionKeyDesc) o;
return Objects.equal(lowerValues, that.lowerValues)
&& Objects.equal(upperValues, that.upperValues)
&& Objects.equal(inValues, that.inValues)
&& partitionKeyValueType == that.partitionKeyValueType
&& Objects.equal(timeInterval, that.timeInterval)
&& Objects.equal(timeType, that.timeType);
}
@Override
public int hashCode() {
return Objects.hashCode(lowerValues, upperValues, inValues, partitionKeyValueType, timeInterval, timeType);
}
}

View File

@ -20,6 +20,8 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Objects;
public class PartitionValue {
public static final PartitionValue MAX_VALUE = new PartitionValue();
@ -69,4 +71,22 @@ public class PartitionValue {
public boolean isHiveDefaultPartition() {
return isHiveDefaultPartition;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionValue that = (PartitionValue) o;
return isHiveDefaultPartition == that.isHiveDefaultPartition
&& Objects.equal(value, that.value);
}
@Override
public int hashCode() {
return Objects.hashCode(value, isHiveDefaultPartition);
}
}

View File

@ -19,14 +19,17 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.common.AnalysisException;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
@ -80,6 +83,22 @@ public class ListPartitionItem extends PartitionItem {
return PartitionKeyDesc.createIn(inValues);
}
@Override
public PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException {
List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
.collect(Collectors.toList());
Set<List<PartitionValue>> res = Sets.newHashSet();
for (List<PartitionValue> values : inValues) {
if (values.size() <= pos) {
throw new AnalysisException(
String.format("toPartitionKeyDesc IndexOutOfBounds, values: %s, pos: %d", values.toString(),
pos));
}
res.add(Lists.newArrayList(values.get(pos)));
}
return PartitionKeyDesc.createIn(Lists.newArrayList(res));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());

View File

@ -17,6 +17,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
@ -28,6 +29,7 @@ import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
@ -38,6 +40,7 @@ import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
@ -47,7 +50,9 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -243,6 +248,69 @@ public class MTMV extends OlapTable {
return refreshSnapshot;
}
/**
* generateMvPartitionDescs
*
* @return mvPartitionId ==> mvPartitionKeyDesc
*/
public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() {
Map<Long, PartitionItem> mtmvItems = getPartitionItems();
Map<Long, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
}
return result;
}
/**
* generateRelatedPartitionDescs
* <p>
* Different partitions may generate the same PartitionKeyDesc through logical calculations
* (such as selecting only one column, or rolling up partitions), so it is a one to many relationship
*
* @return related PartitionKeyDesc ==> relatedPartitionIds
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getPartitionItems();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
if (res.containsKey(partitionKeyDesc)) {
res.get(partitionKeyDesc).add(entry.getKey());
} else {
res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey()));
}
}
return res;
}
/**
* Calculate the partition and associated partition mapping relationship of the MTMV
* It is the result of real-time comparison calculation, so there may be some costs,
* so it should be called with caution
*
* @return mvPartitionId ==> relationPartitionIds
* @throws AnalysisException
*/
public Map<Long, Set<Long>> calculatePartitionMappings() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
Map<Long, Set<Long>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = generateRelatedPartitionDescs();
Map<Long, PartitionItem> mvPartitionItems = getPartitionInfo().getIdToItem(false);
for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
}
return res;
}
public void readMvLock() {
this.mvRwLock.readLock().lock();
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Writable;
import java.util.Comparator;
@ -36,4 +37,13 @@ public abstract class PartitionItem implements Comparable<PartitionItem>, Writab
}
public abstract PartitionKeyDesc toPartitionKeyDesc();
/**
* Generate PartitionKeyDesc using only the posth PartitionValue
*
* @param pos
* @return
* @throws AnalysisException
*/
public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException;
}

View File

@ -53,6 +53,13 @@ public class RangePartitionItem extends PartitionItem {
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
}
@Override
public PartitionKeyDesc toPartitionKeyDesc(int pos) {
// MTMV do not allow base tables with partition type range to have multiple partition columns,
// so pos is ignored here
return toPartitionKeyDesc();
}
@Override
public void write(DataOutput out) throws IOException {
RangeUtils.writeRange(out, partitionKeyRange);

View File

@ -220,7 +220,7 @@ public class PartitionsProcDir implements ProcDirInterface {
return result;
}
private List<List<Comparable>> getPartitionInfos() {
private List<List<Comparable>> getPartitionInfos() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(olapTable);
Preconditions.checkState(olapTable.isManagedTable());
@ -244,6 +244,12 @@ public class PartitionsProcDir implements ProcDirInterface {
}
Joiner joiner = Joiner.on(", ");
Map<Long, List<String>> partitionsUnSyncTables = null;
if (olapTable instanceof MTMV) {
partitionsUnSyncTables = MTMVPartitionUtil
.getPartitionsUnSyncTables((MTMV) olapTable, partitionIds);
}
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
@ -308,15 +314,9 @@ public class PartitionsProcDir implements ProcDirInterface {
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
if (olapTable instanceof MTMV) {
try {
List<String> partitionUnSyncTables = MTMVPartitionUtil
.getPartitionUnSyncTables((MTMV) olapTable, partitionId);
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
partitionInfo.add(partitionUnSyncTables.toString());
} catch (AnalysisException e) {
partitionInfo.add(false);
partitionInfo.add(e.getMessage());
}
List<String> partitionUnSyncTables = partitionsUnSyncTables.get(partitionId);
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
partitionInfo.add(partitionUnSyncTables.toString());
} else {
partitionInfo.add(true);
partitionInfo.add(FeConstants.null_string);

View File

@ -169,9 +169,10 @@ public class MTMVTask extends AbstractTask {
// To be completely consistent with hive, you need to manually refresh the cache
// refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
MTMVPartitionUtil.alignMvPartition(mtmv);
}
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings();
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(partitionMappings);
this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
@ -191,7 +192,8 @@ public class MTMVTask extends AbstractTask {
// need get names before exec
List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds);
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds,
partitionMappings);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
@ -389,7 +391,7 @@ public class MTMVTask extends AbstractTask {
}
}
public List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
public List<Long> calculateNeedRefreshPartitions(Map<Long, Set<Long>> partitionMappings) throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
@ -402,7 +404,8 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables());
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(),
partitionMappings);
if (fresh) {
return Lists.newArrayList();
}
@ -416,7 +419,7 @@ public class MTMVTask extends AbstractTask {
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables());
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables(), partitionMappings);
}
public MTMVTaskContext getTaskContext() {

View File

@ -89,6 +89,19 @@ public class MTMVPartitionInfo {
this.partitionCol = partitionCol;
}
/**
* Get the position of relatedCol in the relatedTable partition column
*
* @return
* @throws AnalysisException
*/
public int getRelatedColPos() throws AnalysisException {
if (partitionType == MTMVPartitionType.SELF_MANAGE) {
throw new AnalysisException("partitionType is: " + partitionType);
}
return MTMVPartitionUtil.getPos(getRelatedTable(), relatedCol);
}
@Override
public String toString() {
return "MTMVPartitionInfo{"

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.AllPartitionDesc;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
@ -35,11 +36,13 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -58,27 +61,26 @@ public class MTMVPartitionUtil {
*
* @param mtmv
* @param partitionId
* @param relatedPartitionIds
* @param tables
* @param excludedTriggerTables
* @return
* @throws AnalysisException
*/
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
// if follow base table, not need compare with related table, only should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
long relatedPartitionId = getExistPartitionId(item,
relatedPartitionItems);
if (relatedPartitionId == -1L) {
LOG.warn("can not found related partition: " + partitionId);
if (CollectionUtils.isEmpty(relatedPartitionIds)) {
LOG.warn("can not found related partition, partitionId: {}, mtmvName: {}, relatedTableName: {}",
partitionId, mtmv.getName(), relatedTable.getName());
return false;
}
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, relatedTable, relatedPartitionIds);
}
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables);
@ -88,26 +90,24 @@ public class MTMVPartitionUtil {
* Align the partitions of mtmv and related tables, delete more and add less
*
* @param mtmv
* @param relatedTable
* @throws DdlException
* @throws AnalysisException
*/
public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable)
public static void alignMvPartition(MTMV mtmv)
throws DdlException, AnalysisException {
Map<Long, PartitionItem> relatedTableItems = Maps.newHashMap(relatedTable.getPartitionItems());
Map<Long, PartitionItem> mtmvItems = Maps.newHashMap(mtmv.getPartitionItems());
Map<Long, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Set<PartitionKeyDesc> relatedPartitionDescs = mtmv.generateRelatedPartitionDescs().keySet();
// drop partition of mtmv
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
if (partitionId == -1L) {
for (Entry<Long, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
if (!relatedPartitionDescs.contains(entry.getValue())) {
dropPartition(mtmv, entry.getKey());
}
}
// add partition for mtmv
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(), mtmvItems);
if (partitionId == -1L) {
addPartition(mtmv, entry.getValue());
HashSet<PartitionKeyDesc> mtmvPartitionDescsSet = Sets.newHashSet(mtmvPartitionDescs.values());
for (PartitionKeyDesc desc : relatedPartitionDescs) {
if (!mtmvPartitionDescsSet.contains(desc)) {
addPartition(mtmv, desc);
}
}
}
@ -117,19 +117,19 @@ public class MTMVPartitionUtil {
*
* @param relatedTable
* @param tableProperties
* @param relatedCol
* @return
* @throws AnalysisException
*/
public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable,
Map<String, String> tableProperties) throws AnalysisException {
Map<String, String> tableProperties, String relatedCol) throws AnalysisException {
HashMap<String, String> partitionProperties = Maps.newHashMap();
List<AllPartitionDesc> res = Lists.newArrayList();
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems();
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
PartitionKeyDesc oldPartitionKeyDesc = entry.getValue().toPartitionKeyDesc();
Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol);
for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) {
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
generatePartitionName(oldPartitionKeyDesc),
oldPartitionKeyDesc, partitionProperties);
generatePartitionName(partitionKeyDesc),
partitionKeyDesc, partitionProperties);
// mtmv can only has one partition col
singlePartitionDesc.analyze(1, tableProperties);
res.add(singlePartitionDesc);
@ -137,6 +137,29 @@ public class MTMVPartitionUtil {
return res;
}
private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol)
throws AnalysisException {
int pos = getPos(relatedTable, relatedCol);
Set<PartitionKeyDesc> res = Sets.newHashSet();
for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItems().entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos);
res.add(partitionKeyDesc);
}
return res;
}
public static int getPos(MTMVRelatedTableIf relatedTable, String relatedCol) throws AnalysisException {
List<Column> partitionColumns = relatedTable.getPartitionColumns();
for (int i = 0; i < partitionColumns.size(); i++) {
if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) {
return i;
}
}
throw new AnalysisException(
String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol,
partitionColumns));
}
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
@ -166,7 +189,7 @@ public class MTMVPartitionUtil {
return false;
}
try {
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet());
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), mtmv.calculatePartitionMappings());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@ -179,14 +202,17 @@ public class MTMVPartitionUtil {
* @param mtmv
* @param tables
* @param excludeTables
* @param partitionMappings
* @return
* @throws AnalysisException
*/
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables)
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables,
Map<Long, Set<Long>> partitionMappings)
throws AnalysisException {
Collection<Partition> partitions = mtmv.getPartitions();
for (Partition partition : partitions) {
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) {
if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), tables,
excludeTables)) {
return false;
}
}
@ -194,14 +220,25 @@ public class MTMVPartitionUtil {
}
/**
* get not sync tables
* getPartitionsUnSyncTables
*
* @param mtmv
* @param partitionId
* @return
* @param partitionIds
* @return partitionId ==> UnSyncTableNames
* @throws AnalysisException
*/
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException {
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds)
throws AnalysisException {
Map<Long, List<String>> res = Maps.newHashMap();
Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings();
for (Long partitionId : partitionIds) {
res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionId, partitionMappings.get(partitionId)));
}
return res;
}
private static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds)
throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
TableIf table = MTMVUtil.getTable(baseTableInfo);
@ -214,15 +251,11 @@ public class MTMVPartitionUtil {
}
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems();
long relatedPartitionId = getExistPartitionId(item,
relatedPartitionItems);
if (relatedPartitionId == -1L) {
if (CollectionUtils.isEmpty(relatedPartitionIds)) {
throw new AnalysisException("can not found related partition");
}
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf,
relatedPartitionId);
boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, mtmvRelatedTableIf,
relatedPartitionIds);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
@ -242,12 +275,13 @@ public class MTMVPartitionUtil {
* @param baseTables
* @return
*/
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) {
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables,
Map<Long, Set<Long>> partitionMappings) {
Collection<Partition> allPartitions = mtmv.getPartitions();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partition.getId());
}
@ -260,27 +294,33 @@ public class MTMVPartitionUtil {
}
/**
* compare last update time of mtmvPartition and tablePartition
* Compare the current and last updated partition (or table) snapshot of the associated partition (or table)
*
* @param mtmv
* @param mtmvPartitionId
* @param relatedTable
* @param relatedPartitionId
* @param relatedPartitionIds
* @return
* @throws AnalysisException
*/
public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
public static boolean isSyncWithPartitions(MTMV mtmv, Long mtmvPartitionId,
MTMVRelatedTableIf relatedTable,
Long relatedPartitionId) throws AnalysisException {
Set<Long> relatedPartitionIds) throws AnalysisException {
if (!relatedTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
return mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot);
for (Long relatedPartitionId : relatedPartitionIds) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
return false;
}
}
return true;
}
/**
@ -323,12 +363,11 @@ public class MTMVPartitionUtil {
* add partition for mtmv like relatedPartitionId of relatedTable
*
* @param mtmv
* @param partitionItem
* @param oldPartitionKeyDesc
* @throws DdlException
*/
private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
private static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc)
throws DdlException {
PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc();
Map<String, String> partitionProperties = Maps.newHashMap();
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
generatePartitionName(oldPartitionKeyDesc),
@ -339,23 +378,6 @@ public class MTMVPartitionUtil {
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause);
}
/**
* compare PartitionItem and return equals partitionId
* if not found, return -1L
*
* @param target
* @param sources
* @return
*/
private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) {
for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
if (target.equals(entry.getValue())) {
return entry.getKey();
}
}
return -1L;
}
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
@ -410,27 +432,35 @@ public class MTMVPartitionUtil {
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
}
/**
* Generate updated snapshots of partitions to determine if they are synchronized
*
* @param mtmv
* @param baseTables
* @param partitionIds
* @param partitionMappings
* @return
* @throws AnalysisException
*/
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
Set<BaseTableInfo> baseTables, Set<Long> partitionIds,
Map<Long, Set<Long>> partitionMappings)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (Long partitionId : partitionIds) {
res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId));
res.put(mtmv.getPartition(partitionId).getName(),
generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId)));
}
return res;
}
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
Set<BaseTableInfo> baseTables, Long partitionId)
Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds)
throws AnalysisException {
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
mtmv.getPartitionItems().get(partitionId),
relatedTable);
for (Long relatedPartitionId : relatedPartitionIds) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
@ -451,18 +481,4 @@ public class MTMVPartitionUtil {
}
return refreshPartitionSnapshot;
}
private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem,
MTMVRelatedTableIf relatedTable) {
List<Long> res = Lists.newArrayList();
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
if (mtmvPartitionItem.equals(entry.getValue())) {
res.add(entry.getKey());
// current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table
return res;
}
}
return res;
}
}

View File

@ -31,6 +31,8 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MTMVRewriteUtil {
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
@ -64,6 +66,7 @@ public class MTMVRewriteUtil {
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
return res;
}
Map<Long, Set<Long>> partitionMappings = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
@ -73,7 +76,11 @@ public class MTMVRewriteUtil {
continue;
}
try {
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(),
if (partitionMappings == null) {
partitionMappings = mtmv.calculatePartitionMappings();
}
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(),
partitionMappings.get(partition.getId()), mtmvRelation.getBaseTables(),
Sets.newHashSet())) {
res.add(partition);
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
@ -304,8 +305,9 @@ public class CreateMTMVInfo {
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (partitionColumnNames.size() != 1) {
throw new AnalysisException("base table for partitioning only support single column.");
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
@ -322,7 +324,7 @@ public class CreateMTMVInfo {
List<AllPartitionDesc> allPartitionDescs = null;
try {
allPartitionDescs = MTMVPartitionUtil
.getPartitionDescsByRelatedTable(relatedTable, properties);
.getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("getPartitionDescsByRelatedTable failed", e);
}