pick: https://github.com/apache/doris/pull/40433
This commit is contained in:
@ -2879,9 +2879,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
|
||||
throws AnalysisException {
|
||||
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
|
||||
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
|
||||
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
|
||||
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
|
||||
return new MTMVVersionSnapshot(visibleVersion);
|
||||
return new MTMVVersionSnapshot(visibleVersion, partitionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -18,6 +18,9 @@
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -74,6 +77,46 @@ public class MTMVRefreshPartitionSnapshot {
|
||||
}
|
||||
|
||||
public void compatible(MTMV mtmv) {
|
||||
try {
|
||||
// snapshot add partitionId resolve problem of insert overwrite
|
||||
compatiblePartitions(mtmv);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e);
|
||||
}
|
||||
try {
|
||||
// change table id to BaseTableInfo
|
||||
compatibleTables(mtmv);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void compatiblePartitions(MTMV mtmv) throws AnalysisException {
|
||||
if (!checkHasDataWithoutPartitionId()) {
|
||||
return;
|
||||
}
|
||||
OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) {
|
||||
MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
|
||||
if (versionSnapshot.getId() == 0) {
|
||||
Partition partition = relatedTable.getPartition(entry.getKey());
|
||||
if (partition != null) {
|
||||
(versionSnapshot).setId(partition.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkHasDataWithoutPartitionId() {
|
||||
for (MTMVSnapshotIf snapshot : partitions.values()) {
|
||||
if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void compatibleTables(MTMV mtmv) {
|
||||
if (tables.size() == tablesInfo.size()) {
|
||||
return;
|
||||
}
|
||||
@ -87,7 +130,7 @@ public class MTMVRefreshPartitionSnapshot {
|
||||
if (tableInfo.isPresent()) {
|
||||
tablesInfo.put(tableInfo.get(), entry.getValue());
|
||||
} else {
|
||||
LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(),
|
||||
LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(),
|
||||
relation.getBaseTablesOneLevel());
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,10 +24,30 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf {
|
||||
@SerializedName("v")
|
||||
private long version;
|
||||
|
||||
// The partition version after insert overwrite is 1,
|
||||
// which may cause the upper level materialized view to be unaware of changes in the data at the bottom level.
|
||||
// However, the partition ID after overwrite will change, so the partition ID should be added.
|
||||
// only for partition, table will always 0
|
||||
@SerializedName("id")
|
||||
private long id;
|
||||
|
||||
public MTMVVersionSnapshot(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public MTMVVersionSnapshot(long version, long id) {
|
||||
this.version = version;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
@ -37,18 +57,19 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf {
|
||||
return false;
|
||||
}
|
||||
MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
|
||||
return version == that.version;
|
||||
return version == that.version && id == that.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(version);
|
||||
return Objects.hashCode(version, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MTMVVersionSnapshot{"
|
||||
+ "version=" + version
|
||||
+ ", id=" + id
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user