[enhance](mtmv) MTMV deal partition use name instead of id (#34910)

partition id will change when insert overwrite

When the materialized view runs a task, if the base table is in insert overwrite, the materialized view task may report an error: partition not found by partitionId

Upgrade compatibility: Hive currently does not support automatic refresh, so it has no impact
This commit is contained in:
zhangdong
2024-05-17 17:31:22 +08:00
committed by yiguolei
parent e3e5f18f26
commit 435147d449
17 changed files with 236 additions and 207 deletions

View File

@ -305,12 +305,12 @@ public class MTMV extends OlapTable {
/**
* generateMvPartitionDescs
*
* @return mvPartitionId ==> mvPartitionKeyDesc
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() {
Map<Long, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<Long, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
}
return result;
@ -321,19 +321,19 @@ public class MTMV extends OlapTable {
* It is the result of real-time comparison calculation, so there may be some costs,
* so it should be called with caution
*
* @return mvPartitionId ==> relationPartitionIds
* @return mvPartitionName ==> relationPartitionNames
* @throws AnalysisException
*/
public Map<Long, Set<Long>> calculatePartitionMappings() throws AnalysisException {
public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
long start = System.currentTimeMillis();
Map<Long, Set<Long>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = MTMVPartitionUtil
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
}

View File

@ -95,6 +95,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -1037,6 +1038,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return partition;
}
public PartitionItem getPartitionItemOrAnalysisException(String partitionName) throws AnalysisException {
Partition partition = nameToPartition.get(partitionName);
if (partition == null) {
partition = tempPartitions.getPartition(partitionName);
}
if (partition == null) {
throw new AnalysisException("partition not found: " + partitionName);
}
return partitionInfo.getItem(partition.getId());
}
public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException {
Partition partition = idToPartition.get(partitionId);
if (partition == null) {
@ -2657,10 +2669,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public Map<Long, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems() {
readLock();
try {
return Maps.newHashMap(getPartitionInfo().getIdToItem(false));
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
if (partition != null) {
res.put(partition.getName(), entry.getValue());
}
}
return res;
} finally {
readUnlock();
}
@ -2672,8 +2691,8 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException {
long visibleVersion = getPartitionOrAnalysisException(partitionId).getVisibleVersion();
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}

View File

@ -878,12 +878,22 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public Map<Long, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
return hivePartitionValues.getIdToPartitionItem();
Map<String, PartitionItem> res = Maps.newHashMap();
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
try {
res.put(getPartitionName(entry.getKey()), entry.getValue());
} catch (AnalysisException e) {
LOG.info("can not get partitionName by: " + entry.getKey());
}
}
return res;
}
@Override
@ -905,35 +915,35 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionId);
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionName);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
}
@Override
public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime());
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
long partitionId = 0L;
String partitionName = "";
long maxVersionTime = 0L;
long visibleVersionTime;
for (Entry<Long, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) {
for (Entry<String, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) {
visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
if (visibleVersionTime > maxVersionTime) {
maxVersionTime = visibleVersionTime;
partitionId = entry.getKey();
partitionName = entry.getKey();
}
}
return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime);
return new MTMVMaxTimestampSnapshot(partitionName, maxVersionTime);
}
private long getPartitionLastModifyTime(long partitionId) throws AnalysisException {
return getPartitionById(partitionId).getLastModifiedTime();
private long getPartitionLastModifyTime(String partitionName) throws AnalysisException {
return getPartitionByName(partitionName).getLastModifiedTime();
}
private HivePartition getPartitionById(long partitionId) throws AnalysisException {
PartitionItem item = getAndCopyPartitionItems().get(partitionId);
private HivePartition getPartitionByName(String partitionName) throws AnalysisException {
PartitionItem item = getAndCopyPartitionItems().get(partitionName);
List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item);
List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList);
if (partitions.size() != 1) {

View File

@ -173,30 +173,28 @@ public class MTMVTask extends AbstractTask {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings();
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(partitionMappings);
this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings();
this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings);
this.refreshMode = generateRefreshMode(needRefreshPartitions);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
}
Map<TableIf, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newCopyOnWriteArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
long execNum = (needRefreshPartitions.size() / refreshPartitionNum) + ((needRefreshPartitions.size()
% refreshPartitionNum) > 0 ? 1 : 0);
this.partitionSnapshots = Maps.newConcurrentMap();
for (int i = 0; i < execNum; i++) {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
Set<String> execPartitionNames = Sets.newHashSet(needRefreshPartitions
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
// need get names before exec
List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds,
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionNames,
partitionMappings);
exec(ctx, execPartitionIds, tableWithPartKey);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
}
@ -218,15 +216,15 @@ public class MTMVTask extends AbstractTask {
}
}
private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
Map<TableIf, String> tableWithPartKey)
private void exec(ConnectContext ctx, Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE
? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);
? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
ctx.setQueryId(queryId);
@ -252,7 +250,7 @@ public class MTMVTask extends AbstractTask {
}
@Override
protected synchronized void executeCancelLogic() {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel();
@ -407,29 +405,30 @@ public class MTMVTask extends AbstractTask {
return tableWithPartKey;
}
private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) {
private MTMVTaskRefreshMode generateRefreshMode(List<String> needRefreshPartitionIds) {
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
return MTMVTaskRefreshMode.NOT_REFRESH;
} else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) {
} else if (needRefreshPartitionIds.size() == mtmv.getPartitionNames().size()) {
return MTMVTaskRefreshMode.COMPLETE;
} else {
return MTMVTaskRefreshMode.PARTIAL;
}
}
public List<Long> calculateNeedRefreshPartitions(Map<Long, Set<Long>> partitionMappings) throws AnalysisException {
public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> partitionMappings)
throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
return mtmv.getPartitionIds();
return Lists.newArrayList(mtmv.getPartitionNames());
} else if (!CollectionUtils
.isEmpty(taskContext.getPartitions())) {
return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
return taskContext.getPartitions();
}
}
// if refreshMethod is COMPLETE, we must FULL refresh, avoid external table MTMV always not refresh
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
return mtmv.getPartitionIds();
return Lists.newArrayList(mtmv.getPartitionNames());
}
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
@ -441,7 +440,7 @@ public class MTMVTask extends AbstractTask {
}
// current, if partitionType is SELF_MANAGE, we can only FULL refresh
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return mtmv.getPartitionIds();
return Lists.newArrayList(mtmv.getPartitionNames());
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId

View File

@ -25,17 +25,17 @@ import com.google.gson.annotations.SerializedName;
* so the update time is used instead of the version
*/
public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf {
// partitionId corresponding to timestamp
// The reason why both timestamp and partitionId are stored is to avoid
// partitionName corresponding to timestamp
// The reason why both timestamp and partitionName are stored is to avoid
// deleting the partition corresponding to timestamp
@SerializedName("p")
private long partitionId;
@SerializedName("pn")
private String partitionName;
// The maximum modify time in all partitions
@SerializedName("t")
private long timestamp;
public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) {
this.partitionId = partitionId;
public MTMVMaxTimestampSnapshot(String partitionName, long timestamp) {
this.partitionName = partitionName;
this.timestamp = timestamp;
}
@ -48,19 +48,18 @@ public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf {
return false;
}
MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o;
return partitionId == that.partitionId
&& timestamp == that.timestamp;
return timestamp == that.timestamp && Objects.equal(partitionName, that.partitionName);
}
@Override
public int hashCode() {
return Objects.hashCode(partitionId, timestamp);
return Objects.hashCode(partitionName, timestamp);
}
@Override
public String toString() {
return "MTMVMaxTimestampSnapshot{"
+ "partitionId=" + partitionId
+ "partitionName='" + partitionName + '\''
+ ", timestamp=" + timestamp
+ '}';
}

View File

@ -73,14 +73,14 @@ public class MTMVPartitionUtil {
* Determine whether the partition is sync with retated partition and other baseTables
*
* @param mtmv
* @param partitionId
* @param relatedPartitionIds
* @param partitionName
* @param relatedPartitionNames
* @param tables
* @param excludedTriggerTables
* @return
* @throws AnalysisException
*/
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds,
public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set<String> relatedPartitionNames,
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
@ -88,14 +88,14 @@ public class MTMVPartitionUtil {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
// if follow base table, not need compare with related table, only should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
if (CollectionUtils.isEmpty(relatedPartitionIds)) {
if (CollectionUtils.isEmpty(relatedPartitionNames)) {
LOG.warn("can not found related partition, partitionId: {}, mtmvName: {}, relatedTableName: {}",
partitionId, mtmv.getName(), relatedTable.getName());
partitionName, mtmv.getName(), relatedTable.getName());
return false;
}
isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, relatedTable, relatedPartitionIds);
isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, relatedTable, relatedPartitionNames);
}
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables);
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionName, tables, excludedTriggerTables);
}
@ -108,11 +108,11 @@ public class MTMVPartitionUtil {
*/
public static void alignMvPartition(MTMV mtmv)
throws DdlException, AnalysisException {
Map<Long, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Map<String, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs();
Set<PartitionKeyDesc> relatedPartitionDescs = generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(),
mtmv.getMvProperties()).keySet();
// drop partition of mtmv
for (Entry<Long, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
for (Entry<String, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) {
if (!relatedPartitionDescs.contains(entry.getValue())) {
dropPartition(mtmv, entry.getKey());
}
@ -152,7 +152,7 @@ public class MTMVPartitionUtil {
return res;
}
public static Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo,
public static Map<PartitionKeyDesc, Set<String>> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo,
Map<String, String> mvProperties) throws AnalysisException {
long start = System.currentTimeMillis();
RelatedPartitionDescResult result = new RelatedPartitionDescResult();
@ -219,11 +219,11 @@ public class MTMVPartitionUtil {
* @throws AnalysisException
*/
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables,
Map<Long, Set<Long>> partitionMappings)
Map<String, Set<String>> partitionMappings)
throws AnalysisException {
List<Long> partitionIds = mtmv.getPartitionIds();
for (Long partitionId : partitionIds) {
if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), tables,
Set<String> partitionNames = mtmv.getPartitionNames();
for (String partitionName : partitionNames) {
if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), tables,
excludeTables)) {
return false;
}
@ -236,20 +236,22 @@ public class MTMVPartitionUtil {
*
* @param mtmv
* @param partitionIds
* @return partitionId ==> UnSyncTableNames
* @return partitionName ==> UnSyncTableNames
* @throws AnalysisException
*/
public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds)
throws AnalysisException {
Map<Long, List<String>> res = Maps.newHashMap();
Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings();
Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings();
for (Long partitionId : partitionIds) {
res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionId, partitionMappings.get(partitionId)));
String partitionName = mtmv.getPartitionOrAnalysisException(partitionId).getName();
res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName, partitionMappings.get(partitionName)));
}
return res;
}
private static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds)
private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partitionName,
Set<String> relatedPartitionNames)
throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
@ -263,16 +265,16 @@ public class MTMVPartitionUtil {
}
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
if (CollectionUtils.isEmpty(relatedPartitionIds)) {
if (CollectionUtils.isEmpty(relatedPartitionNames)) {
throw new AnalysisException("can not found related partition");
}
boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, mtmvRelatedTableIf,
relatedPartitionIds);
boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, mtmvRelatedTableIf,
relatedPartitionNames);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
} else {
if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) {
res.add(table.getName());
}
}
@ -287,18 +289,18 @@ public class MTMVPartitionUtil {
* @param baseTables
* @return
*/
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables,
Map<Long, Set<Long>> partitionMappings) {
List<Long> partitionIds = mtmv.getPartitionIds();
List<Long> res = Lists.newArrayList();
for (Long partitionId : partitionIds) {
public static List<String> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables,
Map<String, Set<String>> partitionMappings) {
Set<String> partitionNames = mtmv.getPartitionNames();
List<String> res = Lists.newArrayList();
for (String partitionName : partitionNames) {
try {
if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), baseTables,
if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partitionId);
res.add(partitionName);
}
} catch (AnalysisException e) {
res.add(partitionId);
res.add(partitionName);
LOG.warn("check isMTMVPartitionSync failed", e);
}
}
@ -309,23 +311,21 @@ public class MTMVPartitionUtil {
* Compare the current and last updated partition (or table) snapshot of the associated partition (or table)
*
* @param mtmv
* @param mtmvPartitionId
* @param mtmvPartitionName
* @param relatedTable
* @param relatedPartitionIds
* @param relatedPartitionNames
* @return
* @throws AnalysisException
*/
public static boolean isSyncWithPartitions(MTMV mtmv, Long mtmvPartitionId,
public static boolean isSyncWithPartitions(MTMV mtmv, String mtmvPartitionName,
MTMVRelatedTableIf relatedTable,
Set<Long> relatedPartitionIds) throws AnalysisException {
Set<String> relatedPartitionNames) throws AnalysisException {
if (!relatedTable.needAutoRefresh()) {
return true;
}
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
for (Long relatedPartitionId : relatedPartitionIds) {
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
.getPartitionSnapshot(relatedPartitionName);
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
@ -355,15 +355,14 @@ public class MTMVPartitionUtil {
* drop partition of mtmv
*
* @param mtmv
* @param partitionId
* @param partitionName
*/
private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException {
private static void dropPartition(MTMV mtmv, String partitionName) throws DdlException {
if (!mtmv.writeLockIfExist()) {
return;
}
try {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partitionName, false, false);
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
} finally {
mtmv.writeUnlock();
@ -394,12 +393,12 @@ public class MTMVPartitionUtil {
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
* @param mtmvPartitionId
* @param mtmvPartitionName
* @param tables
* @param excludedTriggerTables
* @return
*/
private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables,
private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionName, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
@ -412,7 +411,7 @@ public class MTMVPartitionUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo);
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionName, baseTableInfo);
if (!syncWithBaseTable) {
return false;
}
@ -420,7 +419,7 @@ public class MTMVPartitionUtil {
return true;
}
private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo)
private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, BaseTableInfo baseTableInfo)
throws AnalysisException {
TableIf table = null;
try {
@ -440,7 +439,6 @@ public class MTMVPartitionUtil {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
}
@ -450,35 +448,35 @@ public class MTMVPartitionUtil {
*
* @param mtmv
* @param baseTables
* @param partitionIds
* @param partitionNames
* @param partitionMappings
* @return
* @throws AnalysisException
*/
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<Long> partitionIds,
Map<Long, Set<Long>> partitionMappings)
Set<BaseTableInfo> baseTables, Set<String> partitionNames,
Map<String, Set<String>> partitionMappings)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (Long partitionId : partitionIds) {
res.put(mtmv.getPartitionName(partitionId),
generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId)));
for (String partitionName : partitionNames) {
res.put(partitionName,
generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionName)));
}
return res;
}
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds)
Set<BaseTableInfo> baseTables, Set<String> relatedPartitionNames)
throws AnalysisException {
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (Long relatedPartitionId : relatedPartitionIds) {
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
.getPartitionSnapshot(relatedPartitionName);
refreshPartitionSnapshot.getPartitions()
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
.put(relatedPartitionName, partitionSnapshot);
}
}
for (BaseTableInfo baseTableInfo : baseTables) {

View File

@ -51,10 +51,10 @@ public class MTMVRelatedPartitionDescOnePartitionColGenerator implements MTMVRel
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return;
}
Map<PartitionKeyDesc, Set<Long>> res = Maps.newHashMap();
Map<Long, PartitionItem> relatedPartitionItems = lastResult.getItems();
Map<PartitionKeyDesc, Set<String>> res = Maps.newHashMap();
Map<String, PartitionItem> relatedPartitionItems = lastResult.getItems();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
for (Entry<String, PartitionItem> entry : relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
if (res.containsKey(partitionKeyDesc)) {
res.get(partitionKeyDesc).add(entry.getKey());

View File

@ -69,27 +69,27 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti
* @return
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> rollUpList(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs,
public Map<PartitionKeyDesc, Set<String>> rollUpList(Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs,
MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties) throws AnalysisException {
Map<String, Set<String>> identityToValues = Maps.newHashMap();
Map<String, Set<Long>> identityToPartitionIds = Maps.newHashMap();
Map<String, Set<String>> identityToPartitionNames = Maps.newHashMap();
MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr());
for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) {
for (Entry<PartitionKeyDesc, Set<String>> entry : relatedPartitionDescs.entrySet()) {
String rollUpIdentity = exprSerice.getRollUpIdentity(entry.getKey(), mvProperties);
Preconditions.checkNotNull(rollUpIdentity);
if (identityToValues.containsKey(rollUpIdentity)) {
identityToValues.get(rollUpIdentity).addAll(getStringValues(entry.getKey()));
identityToPartitionIds.get(rollUpIdentity).addAll(entry.getValue());
identityToPartitionNames.get(rollUpIdentity).addAll(entry.getValue());
} else {
identityToValues.put(rollUpIdentity, getStringValues(entry.getKey()));
identityToPartitionIds.put(rollUpIdentity, entry.getValue());
identityToPartitionNames.put(rollUpIdentity, entry.getValue());
}
}
Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> result = Maps.newHashMap();
for (Entry<String, Set<String>> entry : identityToValues.entrySet()) {
result.put(PartitionKeyDesc.createIn(getPartitionValues(entry.getValue())),
identityToPartitionIds.get(entry.getKey()));
identityToPartitionNames.get(entry.getKey()));
}
return result;
}
@ -125,11 +125,11 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti
* @return
* @throws AnalysisException
*/
public Map<PartitionKeyDesc, Set<Long>> rollUpRange(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs,
public Map<PartitionKeyDesc, Set<String>> rollUpRange(Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs,
MTMVPartitionInfo mvPartitionInfo) throws AnalysisException {
Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> result = Maps.newHashMap();
MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr());
for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) {
for (Entry<PartitionKeyDesc, Set<String>> entry : relatedPartitionDescs.entrySet()) {
PartitionKeyDesc rollUpDesc = exprSerice.generateRollUpPartitionKeyDesc(entry.getKey(), mvPartitionInfo);
if (result.containsKey(rollUpDesc)) {
result.get(rollUpDesc).addAll(entry.getValue());

View File

@ -43,16 +43,16 @@ public class MTMVRelatedPartitionDescSyncLimitGenerator implements MTMVRelatedPa
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
Map<Long, PartitionItem> partitionItems = lastResult.getItems();
Map<String, PartitionItem> partitionItems = lastResult.getItems();
MTMVPartitionSyncConfig config = generateMTMVPartitionSyncConfigByProperties(mvProperties);
if (config.getSyncLimit() <= 0) {
return;
}
long nowTruncSubSec = getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit());
Optional<String> dateFormat = config.getDateFormat();
Map<Long, PartitionItem> res = Maps.newHashMap();
Map<String, PartitionItem> res = Maps.newHashMap();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) {
for (Entry<String, PartitionItem> entry : partitionItems.entrySet()) {
if (entry.getValue().isGreaterThanSpecifiedTime(relatedColPos, dateFormat, nowTruncSubSec)) {
res.put(entry.getKey(), entry.getValue());
}

View File

@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf {
*
* @return partitionId->PartitionItem
*/
Map<Long, PartitionItem> getAndCopyPartitionItems();
Map<String, PartitionItem> getAndCopyPartitionItems();
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
@ -65,11 +65,11 @@ public interface MTMVRelatedTableIf extends TableIf {
/**
* getPartitionSnapshot
*
* @param partitionId
* @param partitionName
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException;
MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException;
/**
* getTableSnapshot

View File

@ -66,7 +66,7 @@ public class MTMVRewriteUtil {
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
return res;
}
Map<Long, Set<Long>> partitionMappings = null;
Map<String, Set<String>> partitionMappings = null;
// check gracePeriod
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
@ -79,8 +79,8 @@ public class MTMVRewriteUtil {
if (partitionMappings == null) {
partitionMappings = mtmv.calculatePartitionMappings();
}
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(),
partitionMappings.get(partition.getId()), mtmvRelation.getBaseTables(),
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTables(),
Sets.newHashSet())) {
res.add(partition);
}

View File

@ -27,27 +27,27 @@ import java.util.Set;
public class RelatedPartitionDescResult {
// PartitionKeyDesc to relatedTable partition ids(Different partitions may have the same PartitionKeyDesc)
private Map<PartitionKeyDesc, Set<Long>> descs;
private Map<Long, PartitionItem> items;
private Map<PartitionKeyDesc, Set<String>> descs;
private Map<String, PartitionItem> items;
public RelatedPartitionDescResult() {
this.descs = Maps.newHashMap();
this.items = Maps.newHashMap();
}
public Map<PartitionKeyDesc, Set<Long>> getDescs() {
public Map<PartitionKeyDesc, Set<String>> getDescs() {
return descs;
}
public void setDescs(Map<PartitionKeyDesc, Set<Long>> descs) {
public void setDescs(Map<PartitionKeyDesc, Set<String>> descs) {
this.descs = descs;
}
public Map<Long, PartitionItem> getItems() {
public Map<String, PartitionItem> getItems() {
return items;
}
public void setItems(Map<Long, PartitionItem> items) {
public void setItems(Map<String, PartitionItem> items) {
this.items = items;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
@ -56,6 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
@ -79,16 +81,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
* Construct command
*
* @param mv materialize view
* @param partitionIds update partitions in mv and tables
* @param partitionNames update partitions in mv and tables
* @param tableWithPartKey the partitions key for different table
* @return command
*/
public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds,
public static UpdateMvByPartitionCommand from(MTMV mv, Set<String> partitionNames,
Map<TableIf, String> tableWithPartKey) throws UserException {
NereidsParser parser = new NereidsParser();
Map<TableIf, Set<Expression>> predicates =
constructTableWithPredicates(mv, partitionIds, tableWithPartKey);
List<String> parts = constructPartsForMv(mv, partitionIds);
constructTableWithPredicates(mv, partitionNames, tableWithPartKey);
List<String> parts = constructPartsForMv(partitionNames);
Plan plan = parser.parseSingle(mv.getQuerySql());
plan = plan.accept(new PredicateAdder(), predicates);
if (plan instanceof Sink) {
@ -99,17 +101,17 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
return new UpdateMvByPartitionCommand(sink);
}
private static List<String> constructPartsForMv(MTMV mv, Set<Long> partitionIds) {
return partitionIds.stream()
.map(id -> mv.getPartition(id).getName())
.collect(ImmutableList.toImmutableList());
private static List<String> constructPartsForMv(Set<String> partitionNames) {
return Lists.newArrayList(partitionNames);
}
private static Map<TableIf, Set<Expression>> constructTableWithPredicates(MTMV mv,
Set<Long> partitionIds, Map<TableIf, String> tableWithPartKey) {
Set<PartitionItem> items = partitionIds.stream()
.map(id -> mv.getPartitionInfo().getItem(id))
.collect(ImmutableSet.toImmutableSet());
Set<String> partitionNames, Map<TableIf, String> tableWithPartKey) throws AnalysisException {
Set<PartitionItem> items = Sets.newHashSet();
for (String partitionName : partitionNames) {
PartitionItem partitionItem = mv.getPartitionItemOrAnalysisException(partitionName);
items.add(partitionItem);
}
ImmutableMap.Builder<TableIf, Set<Expression>> builder = new ImmutableMap.Builder<>();
tableWithPartKey.forEach((table, colName) ->
builder.put(table, constructPredicates(items, colName))