[enhance](mtmv) mtmv disable hive auto refresh (#30775)
- If the `related table` is `hive`, do not refresh automatically - If the `related table` is `hive`, the partition col is allowed to be `null`. Otherwise, it must be `not null` - add more `ut`
This commit is contained in:
@ -2596,4 +2596,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
return getPartitionOrAnalysisException(partitionId).getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needAutoRefresh() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPartitionColumnAllowNull() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -885,6 +885,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
|
||||
return partitionValuesList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needAutoRefresh() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPartitionColumnAllowNull() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -45,7 +45,7 @@ import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ListComparator;
|
||||
import org.apache.doris.common.util.OrderByPair;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -309,7 +309,7 @@ public class PartitionsProcDir implements ProcDirInterface {
|
||||
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
|
||||
if (olapTable instanceof MTMV) {
|
||||
try {
|
||||
List<String> partitionUnSyncTables = MTMVUtil
|
||||
List<String> partitionUnSyncTables = MTMVPartitionUtil
|
||||
.getPartitionUnSyncTables((MTMV) olapTable, partitionId);
|
||||
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
|
||||
partitionInfo.add(partitionUnSyncTables.toString());
|
||||
|
||||
@ -23,7 +23,6 @@ import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.ListComparator;
|
||||
@ -91,7 +90,7 @@ public class TablesProcDir implements ProcDirInterface {
|
||||
String partitionKey = FeConstants.null_string;
|
||||
table.readLock();
|
||||
try {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
if (table instanceof OlapTable) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) {
|
||||
partitionNum = olapTable.getPartitions().size();
|
||||
|
||||
@ -18,17 +18,14 @@
|
||||
package org.apache.doris.job.extensions.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
@ -36,6 +33,7 @@ import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
import org.apache.doris.mtmv.MTMVPlanUtil;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
|
||||
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
|
||||
@ -145,6 +143,13 @@ public class MTMVTask extends AbstractTask {
|
||||
this.taskContext = Objects.requireNonNull(taskContext);
|
||||
}
|
||||
|
||||
// only for test
|
||||
public MTMVTask(MTMV mtmv, MTMVRelation relation, MTMVTaskContext taskContext) {
|
||||
this.mtmv = mtmv;
|
||||
this.relation = relation;
|
||||
this.taskContext = taskContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws JobException {
|
||||
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
|
||||
@ -161,10 +166,10 @@ public class MTMVTask extends AbstractTask {
|
||||
// To be completely consistent with hive, you need to manually refresh the cache
|
||||
// refreshHmsTable();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
}
|
||||
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
|
||||
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
|
||||
this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
|
||||
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
|
||||
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
|
||||
return;
|
||||
@ -181,8 +186,8 @@ public class MTMVTask extends AbstractTask {
|
||||
Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds
|
||||
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
|
||||
// need get names before exec
|
||||
List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
|
||||
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVUtil
|
||||
List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
|
||||
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
|
||||
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds);
|
||||
exec(ctx, execPartitionIds, tableWithPartKey);
|
||||
completedPartitions.addAll(execPartitionNames);
|
||||
@ -241,7 +246,7 @@ public class MTMVTask extends AbstractTask {
|
||||
LOG.info("mtmv task before, taskId: {}", super.getTaskId());
|
||||
super.before();
|
||||
try {
|
||||
mtmv = getMTMV();
|
||||
mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
|
||||
} catch (UserException e) {
|
||||
LOG.warn("before task failed:", e);
|
||||
throw new JobException(e);
|
||||
@ -267,11 +272,6 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
|
||||
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
|
||||
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runTask() throws JobException {
|
||||
LOG.info("mtmv task runTask, taskId: {}", super.getTaskId());
|
||||
@ -296,7 +296,7 @@ public class MTMVTask extends AbstractTask {
|
||||
String dbName = "";
|
||||
String mvName = "";
|
||||
try {
|
||||
MTMV mtmv = getMTMV();
|
||||
MTMV mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
|
||||
dbName = mtmv.getQualifiedDbName();
|
||||
mvName = mtmv.getName();
|
||||
} catch (UserException e) {
|
||||
@ -386,20 +386,20 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
|
||||
private List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
|
||||
public List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
|
||||
// check whether the user manually triggers it
|
||||
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
|
||||
if (taskContext.isComplete()) {
|
||||
return mtmv.getPartitionIds();
|
||||
} else if (!CollectionUtils
|
||||
.isEmpty(taskContext.getPartitions())) {
|
||||
return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
|
||||
return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
|
||||
}
|
||||
}
|
||||
// check if data is fresh
|
||||
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
|
||||
// to avoid rebuilding the baseTable and causing a change in the tableId
|
||||
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables());
|
||||
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables());
|
||||
if (fresh) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
@ -413,7 +413,7 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
|
||||
// to avoid rebuilding the baseTable and causing a change in the tableId
|
||||
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables());
|
||||
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables());
|
||||
}
|
||||
|
||||
public MTMVTaskContext getTaskContext() {
|
||||
|
||||
@ -0,0 +1,438 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.SinglePartitionDesc;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVPartitionUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVPartitionUtil.class);
|
||||
|
||||
/**
|
||||
* Determine whether the partition is sync with retated partition and other baseTables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
* @param tables
|
||||
* @param excludedTriggerTables
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables) throws AnalysisException {
|
||||
boolean isSyncWithPartition = true;
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
// if follow base table, not need compare with related table, only should compare with related partition
|
||||
excludedTriggerTables.add(relatedTable.getName());
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
LOG.warn("can not found related partition: " + partitionId);
|
||||
return false;
|
||||
}
|
||||
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
|
||||
}
|
||||
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Align the partitions of mtmv and related tables, delete more and add less
|
||||
*
|
||||
* @param mtmv
|
||||
* @param relatedTable
|
||||
* @throws DdlException
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable)
|
||||
throws DdlException, AnalysisException {
|
||||
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems();
|
||||
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
|
||||
// drop partition of mtmv
|
||||
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
|
||||
if (partitionId == -1L) {
|
||||
dropPartition(mtmv, entry.getKey());
|
||||
}
|
||||
}
|
||||
// add partition for mtmv
|
||||
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), mtmvItems);
|
||||
if (partitionId == -1L) {
|
||||
addPartition(mtmv, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (Long partitionId : ids) {
|
||||
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (String partitionName : partitions) {
|
||||
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
|
||||
res.add(partition.getId());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if table is sync with all baseTables
|
||||
*
|
||||
* @param mtmv
|
||||
* @return
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv) {
|
||||
MTMVRelation mtmvRelation = mtmv.getRelation();
|
||||
if (mtmvRelation == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet());
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("isMTMVSync failed: ", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the mtmv is sync with tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param tables
|
||||
* @param excludeTables
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables)
|
||||
throws AnalysisException {
|
||||
Collection<Partition> partitions = mtmv.getPartitions();
|
||||
for (Partition partition : partitions) {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* get not sync tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
|
||||
TableIf table = MTMVUtil.getTable(baseTableInfo);
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
|
||||
if (!mtmvRelatedTableIf.needAutoRefresh()) {
|
||||
continue;
|
||||
}
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
|
||||
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
throw new AnalysisException("can not found related partition");
|
||||
}
|
||||
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf,
|
||||
relatedPartitionId);
|
||||
if (!isSyncWithPartition) {
|
||||
res.add(mtmvRelatedTableIf.getName());
|
||||
}
|
||||
} else {
|
||||
if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
|
||||
res.add(table.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the partitions that need to be refreshed
|
||||
*
|
||||
* @param mtmv
|
||||
* @param baseTables
|
||||
* @return
|
||||
*/
|
||||
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) {
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (Partition partition : allPartitions) {
|
||||
try {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
|
||||
mtmv.getExcludedTriggerTables())) {
|
||||
res.add(partition.getId());
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
res.add(partition.getId());
|
||||
LOG.warn("check isMTMVPartitionSync failed", e);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* compare last update time of mtmvPartition and tablePartition
|
||||
*
|
||||
* @param mtmv
|
||||
* @param mtmvPartitionId
|
||||
* @param relatedTable
|
||||
* @param relatedPartitionId
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
|
||||
MTMVRelatedTableIf relatedTable,
|
||||
Long relatedPartitionId) throws AnalysisException {
|
||||
if (!relatedTable.needAutoRefresh()) {
|
||||
return true;
|
||||
}
|
||||
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
|
||||
.getPartitionSnapshot(relatedPartitionId);
|
||||
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
|
||||
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
|
||||
return mtmv.getRefreshSnapshot()
|
||||
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* like p_00000101_20170201
|
||||
*
|
||||
* @param desc
|
||||
* @return
|
||||
*/
|
||||
public static String generatePartitionName(PartitionKeyDesc desc) {
|
||||
String partitionName = "p_";
|
||||
partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
|
||||
.replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
|
||||
if (partitionName.length() > 50) {
|
||||
partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName))
|
||||
+ "_" + System.currentTimeMillis();
|
||||
}
|
||||
return partitionName;
|
||||
}
|
||||
|
||||
/**
|
||||
* drop partition of mtmv
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
*/
|
||||
private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException {
|
||||
if (!mtmv.writeLockIfExist()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
|
||||
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
|
||||
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
|
||||
} finally {
|
||||
mtmv.writeUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* add partition for mtmv like relatedPartitionId of relatedTable
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionItem
|
||||
* @throws DdlException
|
||||
*/
|
||||
private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
|
||||
throws DdlException {
|
||||
PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc();
|
||||
Map<String, String> partitionProperties = Maps.newHashMap();
|
||||
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
|
||||
generatePartitionName(oldPartitionKeyDesc),
|
||||
oldPartitionKeyDesc, partitionProperties);
|
||||
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc,
|
||||
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
|
||||
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause);
|
||||
}
|
||||
|
||||
/**
|
||||
* compare PartitionItem and return equals partitionId
|
||||
* if not found, return -1L
|
||||
*
|
||||
* @param target
|
||||
* @param sources
|
||||
* @return
|
||||
*/
|
||||
private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) {
|
||||
for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
|
||||
if (target.equals(entry.getValue())) {
|
||||
return entry.getKey();
|
||||
}
|
||||
}
|
||||
return -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
|
||||
*
|
||||
* @param mtmvPartitionId
|
||||
* @param tables
|
||||
* @param excludedTriggerTables
|
||||
* @return
|
||||
*/
|
||||
private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables) throws AnalysisException {
|
||||
for (BaseTableInfo baseTableInfo : tables) {
|
||||
TableIf table = null;
|
||||
try {
|
||||
table = MTMVUtil.getTable(baseTableInfo);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("get table failed, {}", baseTableInfo, e);
|
||||
return false;
|
||||
}
|
||||
if (excludedTriggerTables.contains(table.getName())) {
|
||||
continue;
|
||||
}
|
||||
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo);
|
||||
if (!syncWithBaseTable) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo)
|
||||
throws AnalysisException {
|
||||
TableIf table = null;
|
||||
try {
|
||||
table = MTMVUtil.getTable(baseTableInfo);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("get table failed, {}", baseTableInfo, e);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
// if not MTMVRelatedTableIf, we can not get snapshot from it,
|
||||
// Currently, it is believed to be synchronous
|
||||
return true;
|
||||
}
|
||||
MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
|
||||
if (!baseTable.needAutoRefresh()) {
|
||||
return true;
|
||||
}
|
||||
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
|
||||
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
|
||||
return mtmv.getRefreshSnapshot()
|
||||
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
|
||||
}
|
||||
|
||||
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
|
||||
Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
|
||||
throws AnalysisException {
|
||||
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
|
||||
for (Long partitionId : partitionIds) {
|
||||
res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
|
||||
Set<BaseTableInfo> baseTables, Long partitionId)
|
||||
throws AnalysisException {
|
||||
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
|
||||
mtmv.getPartitionItems().get(partitionId),
|
||||
relatedTable);
|
||||
|
||||
for (Long relatedPartitionId : relatedPartitionIds) {
|
||||
MTMVSnapshotIf partitionSnapshot = relatedTable
|
||||
.getPartitionSnapshot(relatedPartitionId);
|
||||
refreshPartitionSnapshot.getPartitions()
|
||||
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
|
||||
}
|
||||
}
|
||||
for (BaseTableInfo baseTableInfo : baseTables) {
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
|
||||
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
|
||||
continue;
|
||||
}
|
||||
TableIf table = MTMVUtil.getTable(baseTableInfo);
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot());
|
||||
}
|
||||
return refreshPartitionSnapshot;
|
||||
}
|
||||
|
||||
private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem,
|
||||
MTMVRelatedTableIf relatedTable) {
|
||||
List<Long> res = Lists.newArrayList();
|
||||
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
|
||||
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
|
||||
if (mtmvPartitionItem.equals(entry.getValue())) {
|
||||
res.add(entry.getKey());
|
||||
// current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
@ -87,4 +87,19 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
String getPartitionName(long partitionId) throws AnalysisException;
|
||||
|
||||
/**
|
||||
* Does the current type of table allow timed triggering
|
||||
*
|
||||
* @return If return false,The method of comparing whether to synchronize will directly return true,
|
||||
* otherwise the snapshot information will be compared
|
||||
*/
|
||||
boolean needAutoRefresh();
|
||||
|
||||
/**
|
||||
* if allow partition column `isAllowNull`
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
boolean isPartitionColumnAllowNull();
|
||||
}
|
||||
|
||||
@ -71,7 +71,8 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
for (BaseTableInfo tableInfo : mvInfos) {
|
||||
try {
|
||||
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
|
||||
if (!CollectionUtils.isEmpty(MTMVUtil.getMTMVCanRewritePartitions(mtmv, ctx))) {
|
||||
if (!CollectionUtils
|
||||
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, System.currentTimeMillis()))) {
|
||||
res.add(mtmv);
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
|
||||
@ -0,0 +1,87 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public class MTMVRewriteUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
|
||||
|
||||
/**
|
||||
* Determine which partition of mtmv can be rewritten
|
||||
*
|
||||
* @param mtmv
|
||||
* @param ctx
|
||||
* @return
|
||||
*/
|
||||
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
|
||||
long currentTimeMills) {
|
||||
List<Partition> res = Lists.newArrayList();
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
// check session variable if enable rewrite
|
||||
if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
|
||||
return res;
|
||||
}
|
||||
if (MTMVUtil.mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
|
||||
.isMaterializedViewRewriteEnableContainExternalTable()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
MTMVRelation mtmvRelation = mtmv.getRelation();
|
||||
if (mtmvRelation == null) {
|
||||
return res;
|
||||
}
|
||||
// check mv is normal
|
||||
if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
|
||||
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
|
||||
return res;
|
||||
}
|
||||
// check gracePeriod
|
||||
long gracePeriodMills = mtmv.getGracePeriod();
|
||||
for (Partition partition : allPartitions) {
|
||||
if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime()
|
||||
+ gracePeriodMills)) {
|
||||
res.add(partition);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(),
|
||||
Sets.newHashSet())) {
|
||||
res.add(partition);
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
// ignore it
|
||||
LOG.warn("check isMTMVPartitionSync failed", e);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
@ -84,10 +84,10 @@ public class MTMVService {
|
||||
|
||||
public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
|
||||
Objects.requireNonNull(mtmv);
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
}
|
||||
LOG.info("createMTMV: " + mtmv.getName());
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVPartitionUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
}
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
mtmvHookService.createMTMV(mtmv);
|
||||
}
|
||||
|
||||
@ -17,39 +17,19 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.SinglePartitionDesc;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
|
||||
|
||||
/**
|
||||
* get Table by BaseTableInfo
|
||||
@ -66,380 +46,18 @@ public class MTMVUtil {
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the partition is sync with retated partition and other baseTables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
* @param tables
|
||||
* @param excludedTriggerTables
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables) throws AnalysisException {
|
||||
boolean isSyncWithPartition = true;
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
// if follow base table, not need compare with related table, only should compare with related partition
|
||||
excludedTriggerTables.add(relatedTable.getName());
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
LOG.warn("can not found related partition: " + partitionId);
|
||||
return false;
|
||||
}
|
||||
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
|
||||
}
|
||||
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables);
|
||||
|
||||
public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
|
||||
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Align the partitions of mtmv and related tables, delete more and add less
|
||||
*
|
||||
* @param mtmv
|
||||
* @param relatedTable
|
||||
* @throws DdlException
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable)
|
||||
throws DdlException, AnalysisException {
|
||||
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems();
|
||||
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
|
||||
// drop partition of mtmv
|
||||
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
|
||||
if (partitionId == -1L) {
|
||||
dropPartition(mtmv, entry.getKey());
|
||||
}
|
||||
}
|
||||
// add partition for mtmv
|
||||
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), mtmvItems);
|
||||
if (partitionId == -1L) {
|
||||
addPartition(mtmv, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (Long partitionId : ids) {
|
||||
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (String partitionName : partitions) {
|
||||
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
|
||||
res.add(partition.getId());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if table is sync with all baseTables
|
||||
* if base tables of mtmv contains external table
|
||||
*
|
||||
* @param mtmv
|
||||
* @return
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv) {
|
||||
MTMVRelation mtmvRelation = mtmv.getRelation();
|
||||
if (mtmvRelation == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet());
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("isMTMVSync failed: ", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the mtmv is sync with tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param tables
|
||||
* @param excludeTables
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables)
|
||||
throws AnalysisException {
|
||||
Collection<Partition> partitions = mtmv.getPartitions();
|
||||
for (Partition partition : partitions) {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* get not sync tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
|
||||
TableIf table = getTable(baseTableInfo);
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
|
||||
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
throw new AnalysisException("can not found related partition");
|
||||
}
|
||||
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf,
|
||||
relatedPartitionId);
|
||||
if (!isSyncWithPartition) {
|
||||
res.add(mtmvRelatedTableIf.getName());
|
||||
}
|
||||
} else {
|
||||
if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
|
||||
res.add(table.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine which partition of mtmv can be rewritten
|
||||
*
|
||||
* @param mtmv
|
||||
* @param ctx
|
||||
* @return
|
||||
*/
|
||||
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) {
|
||||
List<Partition> res = Lists.newArrayList();
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
// check session variable if enable rewrite
|
||||
if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
|
||||
return res;
|
||||
}
|
||||
if (mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
|
||||
.isMaterializedViewRewriteEnableContainExternalTable()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
MTMVRelation mtmvRelation = mtmv.getRelation();
|
||||
if (mtmvRelation == null) {
|
||||
return res;
|
||||
}
|
||||
// check mv is normal
|
||||
if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
|
||||
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
|
||||
return res;
|
||||
}
|
||||
// check gracePeriod
|
||||
long gracePeriodMills = mtmv.getGracePeriod();
|
||||
long currentTimeMills = System.currentTimeMillis();
|
||||
for (Partition partition : allPartitions) {
|
||||
if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime()
|
||||
+ gracePeriodMills)) {
|
||||
res.add(partition);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) {
|
||||
res.add(partition);
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
// ignore it
|
||||
LOG.warn("check isMTMVPartitionSync failed", e);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the partitions that need to be refreshed
|
||||
*
|
||||
* @param mtmv
|
||||
* @param baseTables
|
||||
* @return
|
||||
*/
|
||||
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) {
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (Partition partition : allPartitions) {
|
||||
try {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
|
||||
mtmv.getExcludedTriggerTables())) {
|
||||
res.add(partition.getId());
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
res.add(partition.getId());
|
||||
LOG.warn("check isMTMVPartitionSync failed", e);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* compare last update time of mtmvPartition and tablePartition
|
||||
*
|
||||
* @param mtmv
|
||||
* @param mtmvPartitionId
|
||||
* @param relatedTable
|
||||
* @param relatedPartitionId
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
|
||||
MTMVRelatedTableIf relatedTable,
|
||||
Long relatedPartitionId) throws AnalysisException {
|
||||
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
|
||||
.getPartitionSnapshot(relatedPartitionId);
|
||||
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
|
||||
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
|
||||
return mtmv.getRefreshSnapshot()
|
||||
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* like p_00000101_20170201
|
||||
*
|
||||
* @param desc
|
||||
* @return
|
||||
*/
|
||||
private static String generatePartitionName(PartitionKeyDesc desc) {
|
||||
String partitionName = "p_";
|
||||
partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
|
||||
.replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
|
||||
if (partitionName.length() > 50) {
|
||||
partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName))
|
||||
+ "_" + System.currentTimeMillis();
|
||||
}
|
||||
return partitionName;
|
||||
}
|
||||
|
||||
/**
|
||||
* drop partition of mtmv
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionId
|
||||
*/
|
||||
private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException {
|
||||
if (!mtmv.writeLockIfExist()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
|
||||
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
|
||||
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
|
||||
} finally {
|
||||
mtmv.writeUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* add partition for mtmv like relatedPartitionId of relatedTable
|
||||
*
|
||||
* @param mtmv
|
||||
* @param partitionItem
|
||||
* @throws DdlException
|
||||
*/
|
||||
private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
|
||||
throws DdlException {
|
||||
PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc();
|
||||
Map<String, String> partitionProperties = Maps.newHashMap();
|
||||
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
|
||||
generatePartitionName(oldPartitionKeyDesc),
|
||||
oldPartitionKeyDesc, partitionProperties);
|
||||
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc,
|
||||
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
|
||||
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause);
|
||||
}
|
||||
|
||||
/**
|
||||
* compare PartitionItem and return equals partitionId
|
||||
* if not found, return -1L
|
||||
*
|
||||
* @param target
|
||||
* @param sources
|
||||
* @return
|
||||
*/
|
||||
private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) {
|
||||
for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
|
||||
if (target.equals(entry.getValue())) {
|
||||
return entry.getKey();
|
||||
}
|
||||
}
|
||||
return -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
|
||||
*
|
||||
* @param mtmvPartitionId
|
||||
* @param tables
|
||||
* @param excludedTriggerTables
|
||||
* @return
|
||||
*/
|
||||
private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables) throws AnalysisException {
|
||||
for (BaseTableInfo baseTableInfo : tables) {
|
||||
TableIf table = null;
|
||||
try {
|
||||
table = getTable(baseTableInfo);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("get table failed, {}", baseTableInfo, e);
|
||||
return false;
|
||||
}
|
||||
if (excludedTriggerTables.contains(table.getName())) {
|
||||
continue;
|
||||
}
|
||||
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo);
|
||||
if (!syncWithBaseTable) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo)
|
||||
throws AnalysisException {
|
||||
TableIf table = null;
|
||||
try {
|
||||
table = getTable(baseTableInfo);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("get table failed, {}", baseTableInfo, e);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
// if not MTMVRelatedTableIf, we can not get snapshot from it,
|
||||
// Currently, it is believed to be synchronous
|
||||
return true;
|
||||
}
|
||||
MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
|
||||
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
|
||||
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
|
||||
return mtmv.getRefreshSnapshot()
|
||||
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
|
||||
}
|
||||
|
||||
private static boolean mtmvContainsExternalTable(MTMV mtmv) {
|
||||
public static boolean mtmvContainsExternalTable(MTMV mtmv) {
|
||||
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
|
||||
for (BaseTableInfo baseTableInfo : baseTables) {
|
||||
if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) {
|
||||
@ -448,60 +66,4 @@ public class MTMVUtil {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
|
||||
Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
|
||||
throws AnalysisException {
|
||||
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
|
||||
for (Long partitionId : partitionIds) {
|
||||
res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
|
||||
Set<BaseTableInfo> baseTables, Long partitionId)
|
||||
throws AnalysisException {
|
||||
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
|
||||
mtmv.getPartitionItems().get(partitionId),
|
||||
relatedTable);
|
||||
|
||||
for (Long relatedPartitionId : relatedPartitionIds) {
|
||||
MTMVSnapshotIf partitionSnapshot = relatedTable
|
||||
.getPartitionSnapshot(relatedPartitionId);
|
||||
refreshPartitionSnapshot.getPartitions()
|
||||
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
|
||||
}
|
||||
}
|
||||
for (BaseTableInfo baseTableInfo : baseTables) {
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
|
||||
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
|
||||
continue;
|
||||
}
|
||||
TableIf table = getTable(baseTableInfo);
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot());
|
||||
}
|
||||
return refreshPartitionSnapshot;
|
||||
}
|
||||
|
||||
private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem,
|
||||
MTMVRelatedTableIf relatedTable) {
|
||||
List<Long> res = Lists.newArrayList();
|
||||
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
|
||||
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
|
||||
if (mtmvPartitionItem.equals(entry.getValue())) {
|
||||
res.add(entry.getKey());
|
||||
// current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mtmv.MTMVRewriteUtil;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
@ -315,8 +315,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
return ImmutableSet.of();
|
||||
}
|
||||
// get mv valid partitions
|
||||
Set<Long> mvDataValidPartitionIdSet = MTMVUtil.getMTMVCanRewritePartitions(mtmv,
|
||||
cascadesContext.getConnectContext()).stream()
|
||||
Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
|
||||
cascadesContext.getConnectContext(), System.currentTimeMillis()).stream()
|
||||
.map(Partition::getId)
|
||||
.collect(Collectors.toSet());
|
||||
Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
|
||||
|
||||
@ -255,7 +255,7 @@ public class MaterializedViewUtils {
|
||||
Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get();
|
||||
if (partitionColumnSet.contains(mvReferenceColumn)) {
|
||||
context.addTableColumn(table, mvReferenceColumn);
|
||||
context.setPctPossible(true);
|
||||
context.setPctPossible(!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull());
|
||||
}
|
||||
return visit(relation, context);
|
||||
}
|
||||
|
||||
@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
@ -67,7 +67,7 @@ public class RefreshMTMVInfo {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
|
||||
MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
if (!CollectionUtils.isEmpty(partitions)) {
|
||||
MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
|
||||
MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, partitions);
|
||||
}
|
||||
} catch (org.apache.doris.common.AnalysisException | MetaNotFoundException | DdlException e) {
|
||||
throw new AnalysisException(e.getMessage());
|
||||
|
||||
@ -33,7 +33,7 @@ import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.job.common.JobType;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVJob;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -634,7 +634,7 @@ public class MetadataGenerator {
|
||||
trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
|
||||
trow.addToColumnValue(new TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
|
||||
trow.addToColumnValue(new TCell().setBoolVal(MTMVPartitionUtil.isMTMVSync(mv)));
|
||||
dataBatch.add(trow);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,187 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVPartitionUtilTest {
|
||||
@Mocked
|
||||
private MTMV mtmv;
|
||||
@Mocked
|
||||
private Partition p1;
|
||||
@Mocked
|
||||
private MTMVRelation relation;
|
||||
@Mocked
|
||||
private BaseTableInfo baseTableInfo;
|
||||
@Mocked
|
||||
private MTMVPartitionInfo mtmvPartitionInfo;
|
||||
@Mocked
|
||||
private OlapTable baseOlapTable;
|
||||
@Mocked
|
||||
private MTMVSnapshotIf baseSnapshotIf;
|
||||
@Mocked
|
||||
private MTMVRefreshSnapshot refreshSnapshot;
|
||||
@Mocked
|
||||
private MTMVUtil mtmvUtil;
|
||||
|
||||
private Set<BaseTableInfo> baseTables = Sets.newHashSet();
|
||||
|
||||
@Before
|
||||
public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException {
|
||||
baseTables.add(baseTableInfo);
|
||||
new Expectations() {
|
||||
{
|
||||
mtmv.getRelation();
|
||||
minTimes = 0;
|
||||
result = relation;
|
||||
|
||||
mtmv.getPartitions();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList(p1);
|
||||
|
||||
p1.getId();
|
||||
minTimes = 0;
|
||||
result = 1L;
|
||||
|
||||
mtmv.getMvPartitionInfo();
|
||||
minTimes = 0;
|
||||
result = mtmvPartitionInfo;
|
||||
|
||||
mtmvPartitionInfo.getPartitionType();
|
||||
minTimes = 0;
|
||||
result = MTMVPartitionType.SELF_MANAGE;
|
||||
|
||||
mtmvUtil.getTable(baseTableInfo);
|
||||
minTimes = 0;
|
||||
result = baseOlapTable;
|
||||
|
||||
baseOlapTable.needAutoRefresh();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
baseOlapTable.getTableSnapshot();
|
||||
minTimes = 0;
|
||||
result = baseSnapshotIf;
|
||||
|
||||
mtmv.getPartitionName(anyLong);
|
||||
minTimes = 0;
|
||||
result = "p1";
|
||||
|
||||
mtmv.getRefreshSnapshot();
|
||||
minTimes = 0;
|
||||
result = refreshSnapshot;
|
||||
|
||||
refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
relation.getBaseTables();
|
||||
minTimes = 0;
|
||||
result = baseTables;
|
||||
|
||||
baseOlapTable.needAutoRefresh();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
baseOlapTable.getPartitionSnapshot(anyLong);
|
||||
minTimes = 0;
|
||||
result = baseSnapshotIf;
|
||||
|
||||
baseOlapTable.getPartitionName(anyLong);
|
||||
minTimes = 0;
|
||||
result = "p1";
|
||||
|
||||
refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsMTMVSyncNormal() {
|
||||
boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
|
||||
Assert.assertTrue(mtmvSync);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsMTMVSyncNotSync() {
|
||||
new Expectations() {
|
||||
{
|
||||
refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
boolean mtmvSync = MTMVPartitionUtil.isMTMVSync(mtmv);
|
||||
Assert.assertFalse(mtmvSync);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsSyncWithPartition() throws AnalysisException {
|
||||
boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
|
||||
Assert.assertTrue(isSyncWithPartition);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsSyncWithPartitionNotSync() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
boolean isSyncWithPartition = MTMVPartitionUtil.isSyncWithPartition(mtmv, 1L, baseOlapTable, 2L);
|
||||
Assert.assertFalse(isSyncWithPartition);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeneratePartitionName() {
|
||||
List<List<PartitionValue>> inValues = Lists.newArrayList();
|
||||
inValues.add(Lists.newArrayList(new PartitionValue("value11"), new PartitionValue("value12")));
|
||||
inValues.add(Lists.newArrayList(new PartitionValue("value21"), new PartitionValue("value22")));
|
||||
PartitionKeyDesc inDesc = PartitionKeyDesc.createIn(inValues);
|
||||
String inName = MTMVPartitionUtil.generatePartitionName(inDesc);
|
||||
Assert.assertEquals("p_value11_value12_value21_value22", inName);
|
||||
|
||||
PartitionKeyDesc rangeDesc = PartitionKeyDesc.createFixed(
|
||||
Lists.newArrayList(new PartitionValue(1L)),
|
||||
Lists.newArrayList(new PartitionValue(2L))
|
||||
);
|
||||
String rangeName = MTMVPartitionUtil.generatePartitionName(rangeDesc);
|
||||
Assert.assertEquals("p_1_2", rangeName);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,96 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class MTMVRefreshSnapshotTest {
|
||||
private String mvExistPartitionName = "mvp1";
|
||||
private String relatedExistPartitionName = "p1";
|
||||
private long baseExistTableId = 1L;
|
||||
private long correctVersion = 1L;
|
||||
private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot();
|
||||
private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion);
|
||||
private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion);
|
||||
|
||||
@Before
|
||||
public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException {
|
||||
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = Maps.newHashMap();
|
||||
MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new MTMVRefreshPartitionSnapshot();
|
||||
partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot);
|
||||
mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, p1Snapshot);
|
||||
mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot);
|
||||
refreshSnapshot.updateSnapshots(partitionSnapshots, Sets.newHashSet(mvExistPartitionName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionSync() {
|
||||
// normal
|
||||
boolean sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName,
|
||||
new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertTrue(sync);
|
||||
// non exist mv partition
|
||||
sync = refreshSnapshot.equalsWithRelatedPartition("mvp2", relatedExistPartitionName,
|
||||
new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
// non exist related partition
|
||||
sync = refreshSnapshot
|
||||
.equalsWithRelatedPartition(mvExistPartitionName, "p2", new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
// snapshot value not equal
|
||||
sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName,
|
||||
new MTMVVersionSnapshot(2L));
|
||||
Assert.assertFalse(sync);
|
||||
// snapshot type not equal
|
||||
sync = refreshSnapshot.equalsWithRelatedPartition(mvExistPartitionName, relatedExistPartitionName,
|
||||
new MTMVTimestampSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableSync() {
|
||||
// normal
|
||||
boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId,
|
||||
new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertTrue(sync);
|
||||
// non exist mv partition
|
||||
sync = refreshSnapshot
|
||||
.equalsWithBaseTable("mvp2", baseExistTableId, new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
// non exist related partition
|
||||
sync = refreshSnapshot
|
||||
.equalsWithBaseTable(mvExistPartitionName, 2L, new MTMVVersionSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
// snapshot value not equal
|
||||
sync = refreshSnapshot
|
||||
.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, new MTMVVersionSnapshot(2L));
|
||||
Assert.assertFalse(sync);
|
||||
// snapshot type not equal
|
||||
sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId,
|
||||
new MTMVTimestampSnapshot(correctVersion));
|
||||
Assert.assertFalse(sync);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,254 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVRewriteUtilTest {
|
||||
@Mocked
|
||||
private MTMV mtmv;
|
||||
@Mocked
|
||||
private ConnectContext ctx;
|
||||
@Mocked
|
||||
private SessionVariable sessionVariable;
|
||||
@Mocked
|
||||
private Partition p1;
|
||||
@Mocked
|
||||
private MTMVRelation relation;
|
||||
@Mocked
|
||||
private MTMVStatus status;
|
||||
@Mocked
|
||||
private MTMVPartitionUtil mtmvPartitionUtil;
|
||||
@Mocked
|
||||
private MTMVUtil mtmvUtil;
|
||||
private long currentTimeMills = 3L;
|
||||
|
||||
@Before
|
||||
public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException {
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
mtmv.getPartitions();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList(p1);
|
||||
|
||||
p1.getVisibleVersionTime();
|
||||
minTimes = 0;
|
||||
result = 1L;
|
||||
|
||||
mtmv.getGracePeriod();
|
||||
minTimes = 0;
|
||||
result = 0L;
|
||||
|
||||
mtmv.getRelation();
|
||||
minTimes = 0;
|
||||
result = relation;
|
||||
|
||||
mtmv.getStatus();
|
||||
minTimes = 0;
|
||||
result = status;
|
||||
|
||||
mtmv.getGracePeriod();
|
||||
minTimes = 0;
|
||||
result = 0L;
|
||||
|
||||
status.getState();
|
||||
minTimes = 0;
|
||||
result = MTMVState.NORMAL;
|
||||
|
||||
status.getRefreshState();
|
||||
minTimes = 0;
|
||||
result = MTMVRefreshState.SUCCESS;
|
||||
|
||||
ctx.getSessionVariable();
|
||||
minTimes = 0;
|
||||
result = sessionVariable;
|
||||
|
||||
sessionVariable.isEnableMaterializedViewRewrite();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
MTMVUtil.mtmvContainsExternalTable((MTMV) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsNormal() {
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsInGracePeriod() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
mtmv.getGracePeriod();
|
||||
minTimes = 0;
|
||||
result = 2L;
|
||||
|
||||
MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
mtmv.getGracePeriod();
|
||||
minTimes = 0;
|
||||
result = 1L;
|
||||
|
||||
MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() {
|
||||
new Expectations() {
|
||||
{
|
||||
sessionVariable.isEnableMaterializedViewRewrite();
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsEnableContainExternalTable() {
|
||||
new Expectations() {
|
||||
{
|
||||
MTMVUtil.mtmvContainsExternalTable((MTMV) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsDisableContainExternalTable() {
|
||||
new Expectations() {
|
||||
{
|
||||
MTMVUtil.mtmvContainsExternalTable((MTMV) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
sessionVariable.isMaterializedViewRewriteEnableContainExternalTable();
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsStateAbnormal() {
|
||||
new Expectations() {
|
||||
{
|
||||
status.getState();
|
||||
minTimes = 0;
|
||||
result = MTMVState.SCHEMA_CHANGE;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMTMVCanRewritePartitionsRefreshStateAbnormal() {
|
||||
new Expectations() {
|
||||
{
|
||||
status.getRefreshState();
|
||||
minTimes = 0;
|
||||
result = MTMVRefreshState.FAIL;
|
||||
}
|
||||
};
|
||||
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
|
||||
.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills);
|
||||
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
|
||||
}
|
||||
|
||||
}
|
||||
163
fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
Normal file
163
fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java
Normal file
@ -0,0 +1,163 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVTaskTest {
|
||||
private long poneId = 1L;
|
||||
private String poneName = "p1";
|
||||
private long ptwoId = 2L;
|
||||
private String ptwoName = "p2";
|
||||
private List<Long> allPartitionIds = Lists.newArrayList(poneId, ptwoId);
|
||||
private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(), Sets.newHashSet());
|
||||
|
||||
@Mocked
|
||||
private MTMV mtmv;
|
||||
@Mocked
|
||||
private MTMVUtil mtmvUtil;
|
||||
@Mocked
|
||||
private MTMVPartitionUtil mtmvPartitionUtil;
|
||||
@Mocked
|
||||
private MTMVPartitionInfo mtmvPartitionInfo;
|
||||
@Mocked
|
||||
private MTMVRefreshInfo mtmvRefreshInfo;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException {
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
mtmvUtil.getMTMV(anyLong, anyLong);
|
||||
minTimes = 0;
|
||||
result = mtmv;
|
||||
|
||||
mtmv.getPartitionIds();
|
||||
minTimes = 0;
|
||||
result = allPartitionIds;
|
||||
|
||||
mtmv.getMvPartitionInfo();
|
||||
minTimes = 0;
|
||||
result = mtmvPartitionInfo;
|
||||
|
||||
mtmvPartitionInfo.getPartitionType();
|
||||
minTimes = 0;
|
||||
result = MTMVPartitionType.FOLLOW_BASE_TABLE;
|
||||
|
||||
mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, Lists.newArrayList(poneName));
|
||||
minTimes = 0;
|
||||
result = poneId;
|
||||
|
||||
mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
mtmv.getRefreshInfo();
|
||||
minTimes = 0;
|
||||
result = mtmvRefreshInfo;
|
||||
|
||||
mtmvRefreshInfo.getRefreshMethod();
|
||||
minTimes = 0;
|
||||
result = RefreshMethod.COMPLETE;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException {
|
||||
MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), true);
|
||||
MTMVTask task = new MTMVTask(mtmv, relation, context);
|
||||
List<Long> result = task.calculateNeedRefreshPartitions();
|
||||
Assert.assertEquals(allPartitionIds, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException {
|
||||
MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false);
|
||||
MTMVTask task = new MTMVTask(mtmv, relation, context);
|
||||
List<Long> result = task.calculateNeedRefreshPartitions();
|
||||
Assert.assertEquals(Lists.newArrayList(poneId), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException {
|
||||
MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
|
||||
MTMVTask task = new MTMVTask(mtmv, relation, context);
|
||||
List<Long> result = task.calculateNeedRefreshPartitions();
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(result));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
|
||||
MTMVTask task = new MTMVTask(mtmv, relation, context);
|
||||
List<Long> result = task.calculateNeedRefreshPartitions();
|
||||
Assert.assertEquals(allPartitionIds, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws AnalysisException {
|
||||
new Expectations() {
|
||||
{
|
||||
mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any);
|
||||
minTimes = 0;
|
||||
result = false;
|
||||
|
||||
mtmvRefreshInfo.getRefreshMethod();
|
||||
minTimes = 0;
|
||||
result = RefreshMethod.AUTO;
|
||||
|
||||
mtmvPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any);
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList(ptwoId);
|
||||
}
|
||||
};
|
||||
MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
|
||||
MTMVTask task = new MTMVTask(mtmv, relation, context);
|
||||
List<Long> result = task.calculateNeedRefreshPartitions();
|
||||
Assert.assertEquals(Lists.newArrayList(ptwoId), result);
|
||||
}
|
||||
}
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
@ -252,7 +251,6 @@ public class MaterializedViewUtilsTest extends TestWithFeService {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void getRelatedTableInfoTestWithoutGroupNullTest() {
|
||||
PlanChecker.from(connectContext)
|
||||
.checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias, l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, "
|
||||
|
||||
@ -8,6 +8,11 @@
|
||||
1 A 20230101
|
||||
2 B 20230101
|
||||
3 C 20230101
|
||||
|
||||
-- !refresh_complete --
|
||||
1 A 20230101
|
||||
2 B 20230101
|
||||
3 C 20230101
|
||||
4 D 20230102
|
||||
5 E 20230102
|
||||
6 F 20230102
|
||||
|
||||
@ -57,12 +57,20 @@ suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive")
|
||||
order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by id"
|
||||
|
||||
//refresh other partitions
|
||||
// current, for hive, auto refresh will not change data
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by id"
|
||||
|
||||
//refresh complete
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName} complete
|
||||
"""
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_refresh_complete "SELECT * FROM ${mvName} order by id"
|
||||
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
|
||||
Reference in New Issue
Block a user