pick from master #39355
This commit is contained in:
@ -159,6 +159,7 @@ public class Alter {
|
||||
|
||||
olapTable.checkNormalStateForAlter();
|
||||
boolean needProcessOutsideTableLock = false;
|
||||
String oldTableName = olapTable.getName();
|
||||
if (currentAlterOps.checkTableStoragePolicy(alterClauses)) {
|
||||
String tableStoragePolicy = olapTable.getStoragePolicy();
|
||||
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
|
||||
@ -283,7 +284,7 @@ public class Alter {
|
||||
throw new DdlException("Invalid alter operations: " + currentAlterOps);
|
||||
}
|
||||
if (needChangeMTMVState(alterClauses)) {
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(olapTable);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(olapTable, oldTableName);
|
||||
}
|
||||
return needProcessOutsideTableLock;
|
||||
}
|
||||
|
||||
@ -189,6 +189,7 @@ import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
|
||||
import org.apache.doris.mtmv.MTMVRelation;
|
||||
import org.apache.doris.mtmv.MTMVService;
|
||||
import org.apache.doris.mtmv.MTMVStatus;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mysql.authenticate.AuthenticateType;
|
||||
import org.apache.doris.mysql.authenticate.AuthenticatorManager;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
@ -1647,6 +1648,14 @@ public class Env {
|
||||
|
||||
auth.rectifyPrivs();
|
||||
catalogMgr.registerCatalogRefreshListener(this);
|
||||
// MTMV needs to be compatible with old metadata, and during the compatibility process,
|
||||
// it needs to wait for all catalog data to be ready, so it cannot be processed through gsonPostProcess()
|
||||
// We catch all possible exceptions to avoid FE startup failure
|
||||
try {
|
||||
MTMVUtil.compatibleMTMV(catalogMgr);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("compatibleMTMV failed", t);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.job.common.TaskStatus;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.mtmv.EnvInfo;
|
||||
@ -494,4 +495,21 @@ public class MTMV extends OlapTable {
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Previously, ID was used to store the related table of materialized views,
|
||||
* but when the catalog is deleted, the ID will change, so name is used instead.
|
||||
* The logic here is to be compatible with older versions by converting ID to name
|
||||
*/
|
||||
public void compatible(CatalogMgr catalogMgr) {
|
||||
if (mvPartitionInfo != null) {
|
||||
mvPartitionInfo.compatible(catalogMgr);
|
||||
}
|
||||
if (relation != null) {
|
||||
relation.compatible(catalogMgr);
|
||||
}
|
||||
if (refreshSnapshot != null) {
|
||||
refreshSnapshot.compatible(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,8 +17,10 @@
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
public class DataChangeEvent extends TableEvent {
|
||||
public DataChangeEvent(long ctlId, long dbId, long tableId) {
|
||||
public DataChangeEvent(long ctlId, long dbId, long tableId) throws AnalysisException {
|
||||
super(EventType.DATA_CHANGE, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,8 +17,10 @@
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
public class DropPartitionEvent extends TableEvent {
|
||||
public DropPartitionEvent(long ctlId, long dbId, long tableId) {
|
||||
public DropPartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException {
|
||||
super(EventType.DROP_PARTITION, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,8 +17,10 @@
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
public class ReplacePartitionEvent extends TableEvent {
|
||||
public ReplacePartitionEvent(long ctlId, long dbId, long tableId) {
|
||||
public ReplacePartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException {
|
||||
super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,16 +17,31 @@
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
|
||||
public abstract class TableEvent extends Event {
|
||||
protected final long ctlId;
|
||||
protected final String ctlName;
|
||||
protected final long dbId;
|
||||
protected final String dbName;
|
||||
protected final long tableId;
|
||||
protected final String tableName;
|
||||
|
||||
public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) {
|
||||
public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) throws AnalysisException {
|
||||
super(eventType);
|
||||
this.ctlId = ctlId;
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(ctlId);
|
||||
DatabaseIf db = catalog.getDbOrAnalysisException(dbId);
|
||||
TableIf table = db.getTableOrAnalysisException(tableId);
|
||||
this.ctlName = catalog.getName();
|
||||
this.dbName = db.getFullName();
|
||||
this.tableName = table.getName();
|
||||
}
|
||||
|
||||
public long getCtlId() {
|
||||
@ -41,12 +56,27 @@ public abstract class TableEvent extends Event {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public String getCtlName() {
|
||||
return ctlName;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TableEvent{"
|
||||
+ "ctlId=" + ctlId
|
||||
+ ", ctlName='" + ctlName + '\''
|
||||
+ ", dbId=" + dbId
|
||||
+ ", dbName='" + dbName + '\''
|
||||
+ ", tableId=" + tableId
|
||||
+ ", tableName='" + tableName + '\''
|
||||
+ "} " + super.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,39 +18,44 @@
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class BaseTableInfo {
|
||||
private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class);
|
||||
|
||||
// The MTMV needs to record the name to avoid changing the ID after rebuilding the same named base table,
|
||||
// which may make the materialized view unusable.
|
||||
// The previous version stored the ID, so it is temporarily kept for compatibility with the old version
|
||||
@SerializedName("ti")
|
||||
@Deprecated
|
||||
private long tableId;
|
||||
@SerializedName("di")
|
||||
@Deprecated
|
||||
private long dbId;
|
||||
@SerializedName("ci")
|
||||
@Deprecated
|
||||
private long ctlId;
|
||||
|
||||
public BaseTableInfo(long tableId, long dbId) {
|
||||
this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null");
|
||||
this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
|
||||
this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID;
|
||||
}
|
||||
|
||||
public BaseTableInfo(long tableId, long dbId, long ctlId) {
|
||||
this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null");
|
||||
this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
|
||||
this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null");
|
||||
}
|
||||
@SerializedName("tn")
|
||||
private String tableName;
|
||||
@SerializedName("dn")
|
||||
private String dbName;
|
||||
@SerializedName("cn")
|
||||
private String ctlName;
|
||||
|
||||
public BaseTableInfo(TableIf table) {
|
||||
java.util.Objects.requireNonNull(table, "table is null");
|
||||
DatabaseIf database = table.getDatabase();
|
||||
java.util.Objects.requireNonNull(database, "database is null");
|
||||
CatalogIf catalog = database.getCatalog();
|
||||
@ -58,20 +63,53 @@ public class BaseTableInfo {
|
||||
this.tableId = table.getId();
|
||||
this.dbId = database.getId();
|
||||
this.ctlId = catalog.getId();
|
||||
this.tableName = table.getName();
|
||||
this.dbName = database.getFullName();
|
||||
this.ctlName = catalog.getName();
|
||||
}
|
||||
|
||||
// for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog
|
||||
public BaseTableInfo(OlapTable table, long dbId) {
|
||||
java.util.Objects.requireNonNull(table, "table is null");
|
||||
this.tableId = table.getId();
|
||||
this.dbId = dbId;
|
||||
this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID;
|
||||
this.tableName = table.getName();
|
||||
this.dbName = table.getQualifiedDbName();
|
||||
this.ctlName = InternalCatalog.INTERNAL_CATALOG_NAME;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public String getCtlName() {
|
||||
return ctlName;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getCtlId() {
|
||||
return ctlId;
|
||||
}
|
||||
|
||||
public void setTableName(String tableName) {
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
@ -81,31 +119,43 @@ public class BaseTableInfo {
|
||||
return false;
|
||||
}
|
||||
BaseTableInfo that = (BaseTableInfo) o;
|
||||
return Objects.equal(tableId, that.tableId)
|
||||
&& Objects.equal(dbId, that.dbId)
|
||||
&& Objects.equal(ctlId, that.ctlId);
|
||||
// for compatibility
|
||||
if (StringUtils.isEmpty(ctlName) || StringUtils.isEmpty(that.ctlName)) {
|
||||
return Objects.equal(tableId, that.tableId) && Objects.equal(
|
||||
dbId, that.dbId) && Objects.equal(ctlId, that.ctlId);
|
||||
} else {
|
||||
return Objects.equal(tableName, that.tableName) && Objects.equal(
|
||||
dbName, that.dbName) && Objects.equal(ctlName, that.ctlName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(tableId, dbId, ctlId);
|
||||
return Objects.hashCode(tableName, dbName, ctlName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BaseTableInfo{"
|
||||
+ "tableId=" + tableId
|
||||
+ ", dbId=" + dbId
|
||||
+ ", ctlId=" + ctlId
|
||||
+ "tableName='" + tableName + '\''
|
||||
+ ", dbName='" + dbName + '\''
|
||||
+ ", ctlName='" + ctlName + '\''
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
public void compatible(CatalogMgr catalogMgr) {
|
||||
if (!StringUtils.isEmpty(ctlName)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
return MTMVUtil.getTable(this).getName();
|
||||
CatalogIf catalog = catalogMgr.getCatalogOrAnalysisException(ctlId);
|
||||
DatabaseIf db = catalog.getDbOrAnalysisException(dbId);
|
||||
TableIf table = db.getTableOrAnalysisException(tableId);
|
||||
this.ctlName = catalog.getName();
|
||||
this.dbName = db.getFullName();
|
||||
this.tableName = table.getName();
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("can not get table: " + this);
|
||||
return "";
|
||||
LOG.warn("MTMV compatible failed, ctlId: {}, dbId: {}, tableId: {}", ctlId, dbId, tableId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,7 +104,7 @@ public interface MTMVHookService {
|
||||
*
|
||||
* @param table
|
||||
*/
|
||||
void alterTable(Table table);
|
||||
void alterTable(Table table, String oldTableName);
|
||||
|
||||
/**
|
||||
* Triggered when pause mtmv
|
||||
|
||||
@ -188,7 +188,7 @@ public class MTMVJobManager implements MTMVHookService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void alterTable(Table table) {
|
||||
public void alterTable(Table table, String oldTableName) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
@ -149,4 +150,11 @@ public class MTMVPartitionInfo {
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
public void compatible(CatalogMgr catalogMgr) {
|
||||
if (relatedTable == null) {
|
||||
return;
|
||||
}
|
||||
relatedTable.compatible(catalogMgr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -447,7 +447,7 @@ public class MTMVPartitionUtil {
|
||||
}
|
||||
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context);
|
||||
return mtmv.getRefreshSnapshot()
|
||||
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
|
||||
.equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -495,8 +495,8 @@ public class MTMVPartitionUtil {
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
refreshPartitionSnapshot.getTables()
|
||||
.put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot(context));
|
||||
refreshPartitionSnapshot.addTableSnapshot(baseTableInfo,
|
||||
((MTMVRelatedTableIf) table).getTableSnapshot(context));
|
||||
}
|
||||
return refreshPartitionSnapshot;
|
||||
}
|
||||
|
||||
@ -17,35 +17,88 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVRefreshPartitionSnapshot {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMV.class);
|
||||
@SerializedName("p")
|
||||
private Map<String, MTMVSnapshotIf> partitions;
|
||||
// old version only persist table id, we need `BaseTableInfo`, `tables` only for compatible old version
|
||||
@SerializedName("t")
|
||||
@Deprecated
|
||||
private Map<Long, MTMVSnapshotIf> tables;
|
||||
@SerializedName("ti")
|
||||
private Map<BaseTableInfo, MTMVSnapshotIf> tablesInfo;
|
||||
|
||||
public MTMVRefreshPartitionSnapshot() {
|
||||
this.partitions = Maps.newConcurrentMap();
|
||||
this.tables = Maps.newConcurrentMap();
|
||||
this.tablesInfo = Maps.newConcurrentMap();
|
||||
}
|
||||
|
||||
public Map<String, MTMVSnapshotIf> getPartitions() {
|
||||
return partitions;
|
||||
}
|
||||
|
||||
public Map<Long, MTMVSnapshotIf> getTables() {
|
||||
return tables;
|
||||
public MTMVSnapshotIf getTableSnapshot(BaseTableInfo table) {
|
||||
if (tablesInfo.containsKey(table)) {
|
||||
return tablesInfo.get(table);
|
||||
}
|
||||
// for compatible old version
|
||||
return tables.get(table.getTableId());
|
||||
}
|
||||
|
||||
public void addTableSnapshot(BaseTableInfo baseTableInfo, MTMVSnapshotIf tableSnapshot) {
|
||||
tablesInfo.put(baseTableInfo, tableSnapshot);
|
||||
// for compatible old version
|
||||
tables.put(baseTableInfo.getTableId(), tableSnapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MTMVRefreshPartitionSnapshot{"
|
||||
+ "partitions=" + partitions
|
||||
+ ", tables=" + tables
|
||||
+ ", tablesInfo=" + tablesInfo
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public void compatible(MTMV mtmv) {
|
||||
if (tables.size() == tablesInfo.size()) {
|
||||
return;
|
||||
}
|
||||
MTMVRelation relation = mtmv.getRelation();
|
||||
if (relation == null || CollectionUtils.isEmpty(relation.getBaseTablesOneLevel())) {
|
||||
return;
|
||||
}
|
||||
for (Entry<Long, MTMVSnapshotIf> entry : tables.entrySet()) {
|
||||
Optional<BaseTableInfo> tableInfo = getByTableId(entry.getKey(),
|
||||
relation.getBaseTablesOneLevel());
|
||||
if (tableInfo.isPresent()) {
|
||||
tablesInfo.put(tableInfo.get(), entry.getValue());
|
||||
} else {
|
||||
LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(),
|
||||
relation.getBaseTablesOneLevel());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<BaseTableInfo> getByTableId(Long tableId, Set<BaseTableInfo> baseTables) {
|
||||
for (BaseTableInfo info : baseTables) {
|
||||
if (info.getTableId() == tableId) {
|
||||
return Optional.of(info);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -55,13 +57,13 @@ public class MTMVRefreshSnapshot {
|
||||
return partitionSnapshot.getPartitions().keySet();
|
||||
}
|
||||
|
||||
public boolean equalsWithBaseTable(String mtmvPartitionName, long baseTableId,
|
||||
public boolean equalsWithBaseTable(String mtmvPartitionName, BaseTableInfo tableInfo,
|
||||
MTMVSnapshotIf baseTableCurrentSnapshot) {
|
||||
MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName);
|
||||
if (partitionSnapshot == null) {
|
||||
return false;
|
||||
}
|
||||
MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTables().get(baseTableId);
|
||||
MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTableSnapshot(tableInfo);
|
||||
if (relatedPartitionSnapshot == null) {
|
||||
return false;
|
||||
}
|
||||
@ -88,4 +90,13 @@ public class MTMVRefreshSnapshot {
|
||||
+ "partitionSnapshots=" + partitionSnapshots
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public void compatible(MTMV mtmv) {
|
||||
if (MapUtils.isEmpty(partitionSnapshots)) {
|
||||
return;
|
||||
}
|
||||
for (MTMVRefreshPartitionSnapshot snapshot : partitionSnapshots.values()) {
|
||||
snapshot.compatible(mtmv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,10 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
@ -61,4 +64,19 @@ public class MTMVRelation {
|
||||
+ ", baseViews=" + baseViews
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public void compatible(CatalogMgr catalogMgr) {
|
||||
compatible(catalogMgr, baseTables);
|
||||
compatible(catalogMgr, baseViews);
|
||||
compatible(catalogMgr, baseTablesOneLevel);
|
||||
}
|
||||
|
||||
private void compatible(CatalogMgr catalogMgr, Set<BaseTableInfo> infos) {
|
||||
if (CollectionUtils.isEmpty(infos)) {
|
||||
return;
|
||||
}
|
||||
for (BaseTableInfo baseTableInfo : infos) {
|
||||
baseTableInfo.compatible(catalogMgr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,7 +187,7 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
*/
|
||||
@Override
|
||||
public void registerMTMV(MTMV mtmv, Long dbId) {
|
||||
refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv.getId(), dbId));
|
||||
refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv, dbId));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -232,7 +232,7 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
*/
|
||||
@Override
|
||||
public void dropTable(Table table) {
|
||||
processBaseTableChange(table, "The base table has been deleted:");
|
||||
processBaseTableChange(new BaseTableInfo(table), "The base table has been deleted:");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -241,8 +241,10 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
* @param table
|
||||
*/
|
||||
@Override
|
||||
public void alterTable(Table table) {
|
||||
processBaseTableChange(table, "The base table has been updated:");
|
||||
public void alterTable(Table table, String oldTableName) {
|
||||
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
|
||||
baseTableInfo.setTableName(oldTableName);
|
||||
processBaseTableChange(baseTableInfo, "The base table has been updated:");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -260,8 +262,7 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
|
||||
}
|
||||
|
||||
private void processBaseTableChange(Table table, String msgPrefix) {
|
||||
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
|
||||
private void processBaseTableChange(BaseTableInfo baseTableInfo, String msgPrefix) {
|
||||
Set<BaseTableInfo> mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo);
|
||||
if (CollectionUtils.isEmpty(mtmvsByBaseTable)) {
|
||||
return;
|
||||
@ -269,9 +270,7 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
for (BaseTableInfo mtmvInfo : mtmvsByBaseTable) {
|
||||
Table mtmv = null;
|
||||
try {
|
||||
mtmv = Env.getCurrentEnv().getInternalCatalog()
|
||||
.getDbOrAnalysisException(mtmvInfo.getDbId())
|
||||
.getTableOrAnalysisException(mtmvInfo.getTableId());
|
||||
mtmv = (Table) MTMVUtil.getTable(mtmvInfo);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn(e);
|
||||
continue;
|
||||
|
||||
@ -17,8 +17,10 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
@ -128,11 +130,11 @@ public class MTMVService implements EventListener {
|
||||
}
|
||||
}
|
||||
|
||||
public void alterTable(Table table) {
|
||||
public void alterTable(Table table, String oldTableName) {
|
||||
Objects.requireNonNull(table);
|
||||
LOG.info("alterTable, tableName: {}", table.getName());
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
mtmvHookService.alterTable(table);
|
||||
mtmvHookService.alterTable(table, oldTableName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -177,12 +179,21 @@ public class MTMVService implements EventListener {
|
||||
}
|
||||
TableEvent tableEvent = (TableEvent) event;
|
||||
LOG.info("processEvent, Event: {}", event);
|
||||
TableIf table;
|
||||
try {
|
||||
table = Env.getCurrentEnv().getCatalogMgr()
|
||||
.getCatalogOrAnalysisException(tableEvent.getCtlName())
|
||||
.getDbOrAnalysisException(tableEvent.getDbName())
|
||||
.getTableOrAnalysisException(tableEvent.getTableName());
|
||||
} catch (AnalysisException e) {
|
||||
throw new EventException(e);
|
||||
}
|
||||
Set<BaseTableInfo> mtmvs = relationManager.getMtmvsByBaseTableOneLevel(
|
||||
new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId()));
|
||||
new BaseTableInfo(table));
|
||||
for (BaseTableInfo baseTableInfo : mtmvs) {
|
||||
try {
|
||||
// check if mtmv should trigger by event
|
||||
MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId());
|
||||
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
|
||||
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) {
|
||||
jobManager.onCommit(mtmv);
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
|
||||
@ -37,6 +38,7 @@ import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -52,11 +54,18 @@ public class MTMVUtil {
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException {
|
||||
TableIf table = Env.getCurrentEnv().getCatalogMgr()
|
||||
.getCatalogOrAnalysisException(baseTableInfo.getCtlId())
|
||||
.getDbOrAnalysisException(baseTableInfo.getDbId())
|
||||
.getTableOrAnalysisException(baseTableInfo.getTableId());
|
||||
return table;
|
||||
// for compatible old version, not have name
|
||||
if (StringUtils.isEmpty(baseTableInfo.getCtlName())) {
|
||||
return Env.getCurrentEnv().getCatalogMgr()
|
||||
.getCatalogOrAnalysisException(baseTableInfo.getCtlId())
|
||||
.getDbOrAnalysisException(baseTableInfo.getDbId())
|
||||
.getTableOrAnalysisException(baseTableInfo.getTableId());
|
||||
} else {
|
||||
return Env.getCurrentEnv().getCatalogMgr()
|
||||
.getCatalogOrAnalysisException(baseTableInfo.getCtlName())
|
||||
.getDbOrAnalysisException(baseTableInfo.getDbName())
|
||||
.getTableOrAnalysisException(baseTableInfo.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) {
|
||||
@ -87,7 +96,7 @@ public class MTMVUtil {
|
||||
public static boolean mtmvContainsExternalTable(MTMV mtmv) {
|
||||
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTablesOneLevel();
|
||||
for (BaseTableInfo baseTableInfo : baseTables) {
|
||||
if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) {
|
||||
if (!baseTableInfo.getCtlName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -151,4 +160,16 @@ public class MTMVUtil {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void compatibleMTMV(CatalogMgr catalogMgr) {
|
||||
List<Database> dbs = catalogMgr.getInternalCatalog().getDbs();
|
||||
for (Database database : dbs) {
|
||||
List<Table> tables = database.getTables();
|
||||
for (Table table : tables) {
|
||||
if (table instanceof MTMV) {
|
||||
((MTMV) table).compatible(catalogMgr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,6 +61,6 @@ public class AlterMTMVRenameInfo extends AlterMTMVInfo {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
|
||||
Table table = db.getTableOrDdlException(mvName.getTbl());
|
||||
Env.getCurrentEnv().renameTable(db, table, newName);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(table);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(table, mvName.getTbl());
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,9 +90,9 @@ public class AlterMTMVReplaceInfo extends AlterMTMVInfo {
|
||||
MTMV mtmv = (MTMV) db.getTableOrDdlException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
MTMV newMtmv = (MTMV) db.getTableOrDdlException(newName, TableType.MATERIALIZED_VIEW);
|
||||
Env.getCurrentEnv().getAlterInstance().processReplaceTable(db, mtmv, newName, swapTable);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(newMtmv);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(newMtmv, mvName.getTbl());
|
||||
if (swapTable) {
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(mtmv);
|
||||
Env.getCurrentEnv().getMtmvService().alterTable(mtmv, newName);
|
||||
} else {
|
||||
Env.getCurrentEnv().getMtmvService().dropMTMV(mtmv);
|
||||
Env.getCurrentEnv().getMtmvService().dropTable(mtmv);
|
||||
|
||||
@ -1103,7 +1103,7 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
|
||||
private void produceEvent(TransactionState transactionState, Database db) {
|
||||
private void produceEvent(TransactionState transactionState, Database db) throws AnalysisException {
|
||||
Collection<TableCommitInfo> tableCommitInfos = transactionState.getIdToTableCommitInfos().values();
|
||||
for (TableCommitInfo tableCommitInfo : tableCommitInfos) {
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
|
||||
Reference in New Issue
Block a user