[feature](mtmv)mtmv support partition (#28144)

- create MTMV support partition and `AUTO` refresh method
- refresh mtmv support support specified partitions
- MTMV support incremental updates
- add property `EXCLUDED_TRIGGER_TABLES` for mv
- Maintain MTMVCache after successful task refresh for plan rewrite(MTMV.getOrGenerateCache)
- show partitions add "SyncWithBaseTables"
- drop job before drop MTMV
- task tvf add "MvId,MvDatabaseId,ErrorMsg,TaskContext,RefreshMode,RefreshPartitions"
- add `NotAllowFallback` for mtmv not fallback to old planner
- add `MTMVUtils.getMTMVCanRewritePartitions() `and `Env.getCurrentEnv().getMtmvService().getRelationManager().getAvailableMTMVs()` for plan rewrite
This commit is contained in:
zhangdong
2023-12-17 18:28:03 +08:00
committed by GitHub
parent 03e989b342
commit 0f3c544260
38 changed files with 1431 additions and 263 deletions

View File

@ -20,7 +20,9 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Index;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import java.util.ArrayList;
import java.util.List;
@ -31,17 +33,21 @@ public class CreateMTMVStmt extends CreateTableStmt {
private final String querySql;
private final EnvInfo envInfo;
private Map<String, String> mvProperties;
private MTMVPartitionInfo mvPartitionInfo;
private MTMVRelation relation;
public CreateMTMVStmt(boolean ifNotExists, TableName mvName, List<Column> columns,
MTMVRefreshInfo refreshInfo, KeysDesc keyDesc, DistributionDesc distributionDesc,
Map<String, String> properties, Map<String, String> mvProperties, String querySql, String comment,
EnvInfo envInfo) {
super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, null,
EnvInfo envInfo, PartitionDesc partitionDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelation relation) {
super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, partitionDesc,
distributionDesc, properties, null, comment, null, null);
this.refreshInfo = refreshInfo;
this.querySql = querySql;
this.envInfo = envInfo;
this.mvProperties = mvProperties;
this.mvPartitionInfo = mvPartitionInfo;
this.relation = relation;
}
public MTMVRefreshInfo getRefreshInfo() {
@ -59,4 +65,12 @@ public class CreateMTMVStmt extends CreateTableStmt {
public Map<String, String> getMvProperties() {
return mvProperties;
}
public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}
public MTMVRelation getRelation() {
return relation;
}
}

View File

@ -18,21 +18,25 @@
package org.apache.doris.catalog;
import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.MVCache;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -41,6 +45,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -62,8 +67,10 @@ public class MTMV extends OlapTable {
private Map<String, String> mvProperties;
@SerializedName("r")
private MTMVRelation relation;
// Should update after every fresh
private MVCache mvCache;
@SerializedName("mpi")
private MTMVPartitionInfo mvPartitionInfo;
// Should update after every fresh, not persist
private MTMVCache cache;
// For deserialization
public MTMV() {
@ -87,6 +94,8 @@ public class MTMV extends OlapTable {
this.status = new MTMVStatus();
this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX + params.tableId);
this.mvProperties = params.mvProperties;
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
mvRwLock = new ReentrantReadWriteLock(true);
}
@ -119,12 +128,12 @@ public class MTMV extends OlapTable {
return relation;
}
public MVCache getMvCache() {
return mvCache;
public MTMVCache getCache() {
return cache;
}
public void setMvCache(MVCache mvCache) {
this.mvCache = mvCache;
public void setCache(MTMVCache cache) {
this.cache = cache;
}
public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
@ -148,6 +157,12 @@ public class MTMV extends OlapTable {
this.status.setSchemaChangeDetail(null);
this.status.setRefreshState(MTMVRefreshState.SUCCESS);
this.relation = relation;
try {
this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this));
} catch (Throwable e) {
this.cache = null;
LOG.warn("generate cache failed", e);
}
} else {
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
@ -170,10 +185,36 @@ public class MTMV extends OlapTable {
}
}
public Set<String> getExcludedTriggerTables() {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
}
String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
return Sets.newHashSet(split);
}
public MTMVCache getOrGenerateCache() throws AnalysisException {
if (cache == null) {
writeMvLock();
try {
if (cache == null) {
this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this));
}
} finally {
writeMvUnlock();
}
}
return cache;
}
public Map<String, String> getMvProperties() {
return mvProperties;
}
public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}
public void readMvLock() {
this.mvRwLock.readLock().lock();
}
@ -207,6 +248,7 @@ public class MTMV extends OlapTable {
jobInfo = materializedView.jobInfo;
mvProperties = materializedView.mvProperties;
relation = materializedView.relation;
mvPartitionInfo = materializedView.mvPartitionInfo;
}
}

View File

@ -947,6 +947,17 @@ public class OlapTable extends Table {
return getPartition(partitionName, true);
}
public Partition getPartitionOrAnalysisException(String partitionName) throws AnalysisException {
Partition partition = getPartition(partitionName, false);
if (partition == null) {
partition = getPartition(partitionName, true);
}
if (partition == null) {
throw new AnalysisException("partition not found: " + partitionName);
}
return partition;
}
// get partition by name
public Partition getPartition(String partitionName, boolean isTempPartition) {
if (isTempPartition) {
@ -965,6 +976,17 @@ public class OlapTable extends Table {
return partition;
}
public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException {
Partition partition = idToPartition.get(partitionId);
if (partition == null) {
partition = tempPartitions.getPartition(partitionId);
}
if (partition == null) {
throw new AnalysisException("partition not found: " + partitionId);
}
return partition;
}
// select the non-empty partition ids belonging to this table.
//
// ATTN: partitions not belonging to this table will be filtered.

View File

@ -22,7 +22,9 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import com.google.common.base.Preconditions;
@ -49,6 +51,8 @@ public class OlapTableFactory {
public EnvInfo envInfo;
public String querySql;
public Map<String, String> mvProperties;
public MTMVPartitionInfo mvPartitionInfo;
public MTMVRelation relation;
}
private BuildParams params;
@ -158,6 +162,22 @@ public class OlapTableFactory {
return this;
}
private OlapTableFactory withMvPartitionInfo(MTMVPartitionInfo mvPartitionInfo) {
Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MTMVParams mtmvParams = (MTMVParams) params;
mtmvParams.mvPartitionInfo = mvPartitionInfo;
return this;
}
private OlapTableFactory withMvRelation(MTMVRelation relation) {
Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MTMVParams mtmvParams = (MTMVParams) params;
mtmvParams.relation = relation;
return this;
}
public OlapTableFactory withExtraParams(DdlStmt stmt) {
boolean isMaterializedView = stmt instanceof CreateMTMVStmt;
if (!isMaterializedView) {
@ -168,6 +188,8 @@ public class OlapTableFactory {
return withRefreshInfo(createMTMVStmt.getRefreshInfo())
.withQuerySql(createMTMVStmt.getQuerySql())
.withMvProperties(createMTMVStmt.getMvProperties())
.withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo())
.withMvRelation(createMTMVStmt.getRelation())
.withEnvInfo(createMTMVStmt.getEnvInfo());
}
}

View File

@ -168,6 +168,18 @@ public class Partition extends MetaObject implements Writable {
return visibleVersionTime;
}
/**
* if visibleVersion is 1, do not return creation time but 0
*
* @return
*/
public long getVisibleVersionTimeIgnoreInit() {
if (visibleVersion == 1) {
return 0L;
}
return visibleVersionTime;
}
// The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated
private void setVisibleVersion(long visibleVersion) {
this.visibleVersion = visibleVersion;

View File

@ -147,6 +147,17 @@ public class PartitionInfo implements Writable {
return item;
}
public PartitionItem getItemOrAnalysisException(long partitionId) throws AnalysisException {
PartitionItem item = idToItem.get(partitionId);
if (item == null) {
item = idToTempItem.get(partitionId);
}
if (item == null) {
throw new AnalysisException("PartitionItem not found: " + partitionId);
}
return item;
}
public void setItem(long partitionId, boolean isTemp, PartitionItem item) {
setItemInternal(partitionId, isTemp, item);
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
@ -37,17 +38,20 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVUtil;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
@ -68,7 +72,7 @@ public class PartitionsProcDir implements ProcDirInterface {
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy")
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
.add("IsMutable")
.add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables")
.build();
private Database db;
@ -303,6 +307,20 @@ public class PartitionsProcDir implements ProcDirInterface {
partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt());
partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
if (olapTable instanceof MTMV) {
try {
List<String> partitionUnSyncTables = MTMVUtil
.getPartitionUnSyncTables((MTMV) olapTable, partitionId);
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
partitionInfo.add(partitionUnSyncTables.toString());
} catch (AnalysisException e) {
partitionInfo.add(false);
partitionInfo.add(e.getMessage());
}
} else {
partitionInfo.add(true);
partitionInfo.add(FeConstants.null_string);
}
partitionInfos.add(partitionInfo);
}

View File

@ -158,6 +158,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
"enable_duplicate_without_keys_by_default";
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
// For unique key data model, the feature Merge-on-Write will leverage a primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
// which can avoid the merging cost in read stage, and accelerate the aggregation

View File

@ -886,6 +886,9 @@ public class InternalCatalog implements CatalogIf<Database> {
+ " please use \"DROP table FORCE\".");
}
}
if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
}
unprotectDropTable(db, table, stmt.isForceDrop(), false, 0);
if (!stmt.isForceDrop()) {
recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(table.getId());
@ -898,9 +901,6 @@ public class InternalCatalog implements CatalogIf<Database> {
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
}
Env.getCurrentEnv().getMtmvService().dropTable(table);
} catch (UserException e) {
throw new DdlException(e.getMessage(), e);

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
@ -39,7 +40,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -47,9 +47,8 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MTMVJob extends AbstractJob<MTMVTask, Map> {
public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
@ -98,6 +97,9 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
@SerializedName(value = "mi")
private long mtmvId;
public MTMVJob() {
}
public MTMVJob(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
@ -110,8 +112,11 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
}
@Override
public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
MTMVTask task = new MTMVTask(dbId, mtmvId);
public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext taskContext) {
if (taskContext == null) {
taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
}
MTMVTask task = new MTMVTask(dbId, mtmvId, taskContext);
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
@ -119,9 +124,25 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
return tasks;
}
/**
* if user trigger, return true
* if system trigger, Check if there are any system triggered tasks, and if so, return false
*
* @param taskContext
* @return
*/
@Override
public boolean isReadyForScheduling(Map taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
if (taskContext != null) {
return true;
}
List<MTMVTask> runningTasks = getRunningTasks();
for (MTMVTask task : runningTasks) {
if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
return false;
}
}
return true;
}
@Override

View File

@ -17,24 +17,26 @@
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVCacheManager;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
@ -44,10 +46,18 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
public class MTMVTask extends AbstractTask {
@ -58,12 +68,17 @@ public class MTMVTask extends AbstractTask {
new Column("TaskId", ScalarType.createStringType()),
new Column("JobId", ScalarType.createStringType()),
new Column("JobName", ScalarType.createStringType()),
new Column("MvId", ScalarType.createStringType()),
new Column("MvDatabaseId", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("DurationMs", ScalarType.createStringType()),
new Column("ExecuteSql", ScalarType.createStringType()));
new Column("TaskContext", ScalarType.createStringType()),
new Column("RefreshMode", ScalarType.createStringType()),
new Column("RefreshPartitions", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@ -75,34 +90,66 @@ public class MTMVTask extends AbstractTask {
COLUMN_TO_INDEX = builder.build();
}
public enum MTMVTaskTriggerMode {
MANUAL,
SYSTEM
}
public enum MTMVTaskRefreshMode {
COMPLETE,
PARTITION,
NOT_REFRESH
}
@SerializedName(value = "di")
private long dbId;
@SerializedName(value = "mi")
private long mtmvId;
@SerializedName("sql")
private String sql;
@SerializedName("taskContext")
private MTMVTaskContext taskContext;
@SerializedName("refreshPartitions")
List<String> refreshPartitions;
@SerializedName("refreshMode")
MTMVTaskRefreshMode refreshMode;
private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
private Set<Long> refreshPartitionIds = Sets.newHashSet();
public MTMVTask(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
public MTMVTask() {
}
public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) {
this.dbId = Objects.requireNonNull(dbId);
this.mtmvId = Objects.requireNonNull(mtmvId);
this.taskContext = Objects.requireNonNull(taskContext);
}
@Override
public void run() throws JobException {
try {
ConnectContext ctx = createContext();
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
TUniqueId queryId = generateQueryId();
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
relation = MTMVCacheManager.generateMTMVRelation(mtmv, ctx);
executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
calculateRefreshInfo();
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
} else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
}
refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds);
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, refreshPartitionIds, tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setQueryId(queryId);
command.run(ctx, executor);
} catch (Throwable e) {
LOG.warn(e);
LOG.warn("run task failed: ", e);
throw new JobException(e);
}
}
@ -134,9 +181,12 @@ public class MTMVTask extends AbstractTask {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
mtmv = (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
sql = generateSql(mtmv);
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
MTMVUtil.alignMvPartition(mtmv, relatedTable);
}
} catch (UserException e) {
LOG.warn(e);
LOG.warn("before task failed:", e);
throw new JobException(e);
}
}
@ -147,46 +197,27 @@ public class MTMVTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId)));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
trow.addToColumnValue(new TCell()
.setStringVal(super.getStatus() == null ? FeConstants.null_string : super.getStatus().toString()));
trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(
(super.getFinishTimeMs() == null || super.getFinishTimeMs() == 0) ? FeConstants.null_string
: String.valueOf(super.getFinishTimeMs() - super.getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(sql));
trow.addToColumnValue(new TCell()
.setStringVal(taskContext == null ? FeConstants.null_string : new Gson().toJson(taskContext)));
trow.addToColumnValue(
new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString()));
trow.addToColumnValue(
new TCell().setStringVal(
refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions)));
return trow;
}
private static String generateSql(MTMV mtmv) {
StringBuilder builder = new StringBuilder();
builder.append("INSERT OVERWRITE TABLE ");
builder.append(mtmv.getDatabase().getCatalog().getName());
builder.append(".");
builder.append(ClusterNamespace.getNameFromFullName(mtmv.getQualifiedDbName()));
builder.append(".");
builder.append(mtmv.getName());
builder.append(" ");
builder.append(mtmv.getQuerySql());
return builder.toString();
}
private ConnectContext createContext() throws AnalysisException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.setThreadLocalInfo();
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
ctx.changeDefaultCatalog(catalog.getName());
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
return ctx;
}
private TUniqueId generateQueryId() {
UUID taskId = UUID.randomUUID();
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
@ -198,5 +229,55 @@ public class MTMVTask extends AbstractTask {
mtmv = null;
relation = null;
executor = null;
refreshPartitionIds = null;
}
private void calculateRefreshInfo() throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
} else if (!CollectionUtils
.isEmpty(taskContext.getPartitions())) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
return;
}
}
// check if data is fresh
Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L);
if (fresh) {
this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
return;
}
// current, if partitionType is SELF_MANAGE, we can only FULL refresh
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
}
// if refreshMethod is COMPLETE, we only FULL refresh
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
}
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
excludedTriggerTables.add(relatedTable.getName());
// check if every table except relatedTable is fresh
Set<Long> mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
// if true, we can use `Partition`, otherwise must `FULL`
if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = mtmvNeedRefreshPartitions;
return;
} else {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
}
}
public MTMVTaskContext getTaskContext() {
return taskContext;
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import com.google.gson.annotations.SerializedName;
import java.util.List;
public class MTMVTaskContext {
@SerializedName(value = "triggerMode")
private MTMVTaskTriggerMode triggerMode;
@SerializedName(value = "partitions")
private List<String> partitions;
@SerializedName(value = "isComplete")
private boolean isComplete;
public MTMVTaskContext(MTMVTaskTriggerMode triggerMode) {
this.triggerMode = triggerMode;
}
public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String> partitions, boolean isComplete) {
this.triggerMode = triggerMode;
this.partitions = partitions;
this.isComplete = isComplete;
}
public List<String> getPartitions() {
return partitions;
}
public MTMVTaskTriggerMode getTriggerMode() {
return triggerMode;
}
public boolean isComplete() {
return isComplete;
}
}

View File

@ -120,6 +120,7 @@ public abstract class AbstractTask implements Task {
run();
onSuccess();
} catch (Exception e) {
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
}

View File

@ -19,13 +19,18 @@ package org.apache.doris.mtmv;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BaseTableInfo {
private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class);
@SerializedName("ti")
private Long tableId;
@SerializedName("di")
@ -88,4 +93,13 @@ public class BaseTableInfo {
+ ", ctlId=" + ctlId
+ '}';
}
public String getTableName() {
try {
return MTMVUtil.getTable(this).getName();
} catch (AnalysisException e) {
LOG.warn("can not get table: " + this);
return "";
}
}
}

View File

@ -36,14 +36,14 @@ import java.util.stream.Collectors;
/**
* The cache for materialized view cache
*/
public class MVCache {
public class MTMVCache {
// the materialized view plan which should be optimized by the same rules to query
private final Plan logicalPlan;
// this should be shuttle expression with lineage
private final List<NamedExpression> mvOutputExpressions;
public MVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
public MTMVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}
@ -56,12 +56,12 @@ public class MVCache {
return mvOutputExpressions;
}
public MVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}
public static MVCache from(MTMV mtmv, ConnectContext connectContext) {
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// TODO: connect context set current db when create mv by use database
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
@ -77,6 +77,6 @@ public class MVCache {
List<NamedExpression> mvOutputExpressions = mvRewrittenPlan.getExpressions().stream()
.map(NamedExpression.class::cast)
.collect(Collectors.toList());
return new MVCache(mvPlan, mvOutputExpressions);
return new MTMVCache(mvPlan, mvOutputExpressions);
}
}

View File

@ -33,6 +33,8 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@ -48,7 +50,7 @@ import java.util.List;
* when do some operation, do something about job
*/
public class MTMVJobManager implements MTMVHookService {
public static final String MTMV_JOB_PREFIX = "mtmv_";
public static final String MTMV_JOB_PREFIX = "inner_mtmv_";
/**
* create MTMVJob
@ -174,7 +176,10 @@ public class MTMVJobManager implements MTMVHookService {
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
}
try {
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
info.isComplete());
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), mtmvTaskContext);
} catch (JobException e) {
e.printStackTrace();
throw new DdlException(e.getMessage());

View File

@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.gson.annotations.SerializedName;
/**
* MTMVPartitionInfo
*/
public class MTMVPartitionInfo {
public enum MTMVPartitionType {
FOLLOW_BASE_TABLE,
SELF_MANAGE
}
@SerializedName("pt")
MTMVPartitionType partitionType;
@SerializedName("rt")
BaseTableInfo relatedTable;
@SerializedName("rc")
String relatedCol;
@SerializedName("pc")
String partitionCol;
public MTMVPartitionInfo() {
}
public MTMVPartitionInfo(MTMVPartitionType partitionType) {
this.partitionType = partitionType;
}
public MTMVPartitionInfo(MTMVPartitionType partitionType,
String partitionCol) {
this.partitionType = partitionType;
this.partitionCol = partitionCol;
}
public MTMVPartitionType getPartitionType() {
return partitionType;
}
public void setPartitionType(MTMVPartitionType partitionType) {
this.partitionType = partitionType;
}
public BaseTableInfo getRelatedTable() {
return relatedTable;
}
public void setRelatedTable(BaseTableInfo relatedTable) {
this.relatedTable = relatedTable;
}
public String getRelatedCol() {
return relatedCol;
}
public void setRelatedCol(String relatedCol) {
this.relatedCol = relatedCol;
}
public String getPartitionCol() {
return partitionCol;
}
public void setPartitionCol(String partitionCol) {
this.partitionCol = partitionCol;
}
@Override
public String toString() {
return "MTMVPartitionInfo{"
+ "partitionType=" + partitionType
+ ", relatedTable=" + relatedTable
+ ", relatedCol='" + relatedCol + '\''
+ ", partitionCol='" + partitionCol + '\''
+ '}';
}
public String toNameString() {
if (partitionType == MTMVPartitionType.SELF_MANAGE) {
return "MTMVPartitionInfo{"
+ "partitionType=" + partitionType
+ '}';
} else {
return "MTMVPartitionInfo{"
+ "partitionType=" + partitionType
+ ", relatedTable=" + relatedTable.getTableName()
+ ", relatedCol='" + relatedCol + '\''
+ ", partitionCol='" + partitionCol + '\''
+ '}';
}
}
}

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import java.util.List;
import java.util.Set;
public class MTMVPlanUtil {
public static ConnectContext createMTMVContext(MTMV mtmv) throws AnalysisException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.setThreadLocalInfo();
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
ctx.changeDefaultCatalog(catalog.getName());
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
return ctx;
}
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
return generateMTMVRelation(plan);
}
public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
}
private static Set<BaseTableInfo> getBaseTables(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}
private static Set<BaseTableInfo> getBaseViews(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets.newHashSet(TableType.VIEW));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}
private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> tables) {
Set<BaseTableInfo> result = com.google.common.collect.Sets.newHashSet();
for (TableIf table : tables) {
result.add(new BaseTableInfo(table));
}
return result;
}
private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
} catch (Exception e) {
throw new ParseException("Nereids parse failed. " + e.getMessage());
}
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
}
}

View File

@ -26,7 +26,8 @@ public class MTMVRefreshEnum {
* RefreshMethod
*/
public enum RefreshMethod {
COMPLETE //complete
COMPLETE, //complete
AUTO //try to update incrementally, if not possible, update in full
}
/**

View File

@ -17,38 +17,19 @@
package org.apache.doris.mtmv;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
@ -63,126 +44,34 @@ import java.util.Set;
/**
* when do some operation, do something about cache
*/
public class MTMVCacheManager implements MTMVHookService {
private static final Logger LOG = LogManager.getLogger(MTMVCacheManager.class);
public class MTMVRelationManager implements MTMVHookService {
private static final Logger LOG = LogManager.getLogger(MTMVRelationManager.class);
private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs = Maps.newConcurrentMap();
public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) {
return tableMTMVs.get(table);
}
// TODO Implement the method which getting materialized view by tables
public List<MTMV> getAvailableMaterializedView(List<BaseTableInfo> tables) {
return ImmutableList.of();
}
public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws AnalysisException, DdlException {
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMvRewrite()) {
return false;
}
MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return false;
}
// chaek mv is normal
if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
return false;
}
// check external table
boolean containsExternalTable = containsExternalTable(mtmvRelation.getBaseTables());
if (containsExternalTable) {
return ctx.getSessionVariable().isEnableExternalMvRewrite();
}
// check gracePeriod
Long gracePeriod = mtmv.getGracePeriod();
// do not care data is delayed
if (gracePeriod < 0) {
return true;
}
// compare with base table
Long mtmvLastTime = getTableLastVisibleVersionTime(mtmv);
Long maxAvailableTime = mtmvLastTime + gracePeriod;
for (BaseTableInfo baseTableInfo : mtmvRelation.getBaseTables()) {
long tableLastVisibleVersionTime = getTableLastVisibleVersionTime(baseTableInfo);
if (tableLastVisibleVersionTime > maxAvailableTime) {
return false;
public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos) {
Set<MTMV> res = Sets.newHashSet();
Set<BaseTableInfo> mvInfos = getAvailableMTMVInfos(tableInfos);
for (BaseTableInfo tableInfo : mvInfos) {
try {
res.add((MTMV) MTMVUtil.getTable(tableInfo));
} catch (AnalysisException e) {
// not throw exception to client, just ignore it
LOG.warn("getTable failed: {}", tableInfo.toString(), e);
}
}
return true;
return res;
}
private long getTableLastVisibleVersionTime(BaseTableInfo baseTableInfo) throws AnalysisException, DdlException {
Table table = Env.getCurrentEnv().getInternalCatalog()
.getDbOrAnalysisException(baseTableInfo.getDbId())
.getTableOrDdlException(baseTableInfo.getTableId(), TableType.OLAP);
return getTableLastVisibleVersionTime((OlapTable) table);
}
private long getTableLastVisibleVersionTime(OlapTable table) {
long result = 0L;
long visibleVersionTime;
for (Partition partition : table.getAllPartitions()) {
visibleVersionTime = partition.getVisibleVersionTime();
if (visibleVersionTime > result) {
result = visibleVersionTime;
}
public Set<BaseTableInfo> getAvailableMTMVInfos(List<BaseTableInfo> tableInfos) {
Set<BaseTableInfo> mvInfos = Sets.newHashSet();
for (BaseTableInfo tableInfo : tableInfos) {
mvInfos.addAll(getMtmvsByBaseTable(tableInfo));
}
return result;
}
private boolean containsExternalTable(Set<BaseTableInfo> baseTableInfos) {
for (BaseTableInfo baseTableInfo : baseTableInfos) {
if (InternalCatalog.INTERNAL_CATALOG_ID != baseTableInfo.getCtlId()) {
return true;
}
}
return false;
}
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
}
private static Set<BaseTableInfo> getBaseTables(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}
private static Set<BaseTableInfo> getBaseViews(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
Sets.newHashSet(TableType.VIEW));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}
private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> tables) {
Set<BaseTableInfo> result = Sets.newHashSet();
for (TableIf table : tables) {
result.add(new BaseTableInfo(table));
}
return result;
}
private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
} catch (Exception e) {
throw new ParseException("Nereids parse failed. " + e.getMessage());
}
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return mvInfos;
}
private Set<BaseTableInfo> getOrCreateMTMVs(BaseTableInfo baseTableInfo) {
@ -233,6 +122,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* modify `tableMTMVs` by MTMVRelation
*
* @param mtmv
* @param dbId
*/
@ -243,6 +133,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* remove cache of mtmv
*
* @param mtmv
*/
@Override
@ -262,6 +153,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* modify `tableMTMVs` by MTMVRelation
*
* @param mtmv
* @param relation
* @param task
@ -276,6 +168,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* update mtmv status to `SCHEMA_CHANGE`
*
* @param table
*/
@Override
@ -285,6 +178,7 @@ public class MTMVCacheManager implements MTMVHookService {
/**
* update mtmv status to `SCHEMA_CHANGE`
*
* @param table
*/
@Override

View File

@ -18,10 +18,13 @@
package org.apache.doris.mtmv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.persist.AlterMTMV;
@ -36,16 +39,16 @@ public class MTMVService {
private static final Logger LOG = LogManager.getLogger(MTMVService.class);
private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
private MTMVCacheManager cacheManager = new MTMVCacheManager();
private MTMVRelationManager relationManager = new MTMVRelationManager();
private MTMVJobManager jobManager = new MTMVJobManager();
public MTMVService() {
registerHook("MTMVJobManager", jobManager);
registerHook("MTMVCacheManager", cacheManager);
registerHook("MTMVRelationManager", relationManager);
}
public MTMVCacheManager getCacheManager() {
return cacheManager;
public MTMVRelationManager getRelationManager() {
return relationManager;
}
public void registerHook(String name, MTMVHookService mtmvHookService) {
@ -76,8 +79,12 @@ public class MTMVService {
}
}
public void createMTMV(MTMV mtmv) throws DdlException {
public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
Objects.requireNonNull(mtmv);
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
MTMVUtil.alignMvPartition(mtmv, relatedTable);
}
LOG.info("createMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.createMTMV(mtmv);

View File

@ -0,0 +1,489 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
public class MTMVUtil {
private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
/**
* get Table by BaseTableInfo
*
* @param baseTableInfo
* @return
* @throws AnalysisException
*/
public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException {
TableIf table = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(baseTableInfo.getCtlId())
.getDbOrAnalysisException(baseTableInfo.getDbId())
.getTableOrAnalysisException(baseTableInfo.getTableId());
return table;
}
/**
* Determine whether the mtmv is sync with tables
*
* @param mtmv
* @param tables
* @param excludedTriggerTables
* @param gracePeriod
* @return
*/
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) {
return isSync(getTableMinVisibleVersionTime(mtmv), tables, excludedTriggerTables, gracePeriod);
}
/**
* Determine whether the partition is sync with retated partition and other baseTables
*
* @param mtmv
* @param partitionId
* @param tables
* @param excludedTriggerTables
* @param gracePeriod
* @return
* @throws AnalysisException
*/
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) getTable(mtmv.getMvPartitionInfo().getRelatedTable());
// if follow base table, not need compare with related table, only should compare with related partition
excludedTriggerTables.add(relatedTable.getName());
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
long relatedPartitionId = getExistPartitionId(item,
relatedTable.getPartitionInfo().getIdToItem(false));
if (relatedPartitionId == -1L) {
LOG.warn("can not found related partition: " + partitionId);
return false;
}
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
}
return isSyncWithPartition && isSync(
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(), tables,
excludedTriggerTables, gracePeriod);
}
/**
* Align the partitions of mtmv and related tables, delete more and add less
*
* @param mtmv
* @param relatedTable
* @throws DdlException
* @throws AnalysisException
*/
public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable)
throws DdlException, AnalysisException {
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false);
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false);
// drop partition of mtmv
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
if (partitionId == -1L) {
dropPartition(mtmv, entry.getKey());
}
}
// add partition for mtmv
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(), mtmvItems);
if (partitionId == -1L) {
addPartition(mtmv, relatedTable, entry.getKey());
}
}
}
/**
* get mv.partitions which not sync with relatedTable
* <p>
* Comparing the time of mtmv and relatedTable partitioning,
* if the visibleVersionTime of the base table is later,
* then the partitioning of this mtmv is considered stale
*
* @param mtmv
* @param relatedTable
* @return partitionIds
* @throws DdlException when partition can not found
*/
public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable)
throws AnalysisException {
Set<Long> ids = Sets.newHashSet();
Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv, relatedTable);
for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) {
for (Long relatedPartitionId : entry.getValue()) {
boolean syncWithRelatedPartition = isSyncWithPartition(mtmv, entry.getKey(), relatedTable,
relatedPartitionId);
if (!syncWithRelatedPartition) {
ids.add(entry.getKey());
break;
}
}
}
return ids;
}
public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
}
return res;
}
public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
Set<Long> res = Sets.newHashSet();
for (String partitionName : partitions) {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
res.add(partition.getId());
}
return res;
}
/**
* check if table is sync with all baseTables
*
* @param mtmv
* @return
*/
public static boolean isMTMVSync(MTMV mtmv) {
MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return false;
}
return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(), Sets.newHashSet(), 0L);
}
/**
* get not sync tables
*
* @param mtmv
* @param partitionId
* @return
* @throws AnalysisException
*/
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException {
List<String> res = Lists.newArrayList();
long maxAvailableTime = mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
TableIf table = getTable(baseTableInfo);
if (!(table instanceof OlapTable)) {
continue;
}
OlapTable olapTable = (OlapTable) table;
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
.getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) {
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
long relatedPartitionId = getExistPartitionId(item,
olapTable.getPartitionInfo().getIdToItem(false));
if (relatedPartitionId == -1L) {
throw new AnalysisException("can not found related partition");
}
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, olapTable, relatedPartitionId);
if (!isSyncWithPartition) {
res.add(olapTable.getName());
}
} else {
long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table);
if (tableLastVisibleVersionTime > maxAvailableTime) {
res.add(table.getName());
}
}
}
return res;
}
/**
* Determine which partition of mtmv can be rewritten
*
* @param mtmv
* @param ctx
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) {
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMvRewrite()) {
return res;
}
MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return res;
}
// check mv is normal
if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
&& mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) {
return res;
}
// check gracePeriod
Long gracePeriod = mtmv.getGracePeriod();
// do not care data is delayed
if (gracePeriod < 0) {
return allPartitions;
}
for (Partition partition : allPartitions) {
try {
if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet(),
gracePeriod)) {
res.add(partition);
}
} catch (AnalysisException e) {
// ignore it
LOG.warn("check isMTMVPartitionSync failed", e);
}
}
return res;
}
public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
Collection<Partition> allPartitions = mtmv.getPartitions();
Set<Long> res = Sets.newHashSet();
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(),
mtmv.getExcludedTriggerTables(),
0L)) {
res.add(partition.getId());
}
} catch (AnalysisException e) {
res.add(partition.getId());
LOG.warn("check isMTMVPartitionSync failed", e);
}
}
return res;
}
/**
* compare last update time of mtmvPartition and tablePartition
*
* @param mtmv
* @param mtmvPartitionId
* @param relatedTable
* @param relatedTablePartitionId
* @return
* @throws AnalysisException
*/
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, OlapTable relatedTable,
Long relatedTablePartitionId) throws AnalysisException {
return mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit() >= relatedTable
.getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit();
}
/**
* like p_00000101_20170201
*
* @param desc
* @return
*/
private static String generatePartitionName(PartitionKeyDesc desc) {
String partitionName = "p_";
partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
.replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
if (partitionName.length() > 50) {
partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName))
+ "_" + System.currentTimeMillis();
}
return partitionName;
}
/**
* drop partition of mtmv
*
* @param mtmv
* @param partitionId
*/
private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
}
/**
* add partition for mtmv like relatedPartitionId of relatedTable
*
* @param mtmv
* @param relatedTable
* @param relatedPartitionId
* @throws AnalysisException
* @throws DdlException
*/
private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long relatedPartitionId)
throws AnalysisException, DdlException {
PartitionDesc partitionDesc = relatedTable.getPartitionInfo().toPartitionDesc(relatedTable);
Partition partition = relatedTable.getPartitionOrAnalysisException(relatedPartitionId);
SinglePartitionDesc oldPartitionDesc = partitionDesc.getSinglePartitionDescByName(partition.getName());
Map<String, String> partitionProperties = Maps.newHashMap();
SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true,
generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()),
oldPartitionDesc.getPartitionKeyDesc(), partitionProperties);
AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc,
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause);
}
/**
* compare PartitionItem and return equals partitionId
* if not found, return -1L
*
* @param target
* @param sources
* @return
*/
private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) {
for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
if (target.equals(entry.getValue())) {
return entry.getKey();
}
}
return -1L;
}
/**
* Get the maximum update time among all partitions
*
* @param table
* @return
*/
private static long getTableMaxVisibleVersionTime(OlapTable table) {
long result = 0L;
long visibleVersionTime;
for (Partition partition : table.getAllPartitions()) {
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
if (visibleVersionTime > result) {
result = visibleVersionTime;
}
}
return result;
}
/**
* Get the minimum update time among all partitions
*
* @param table
* @return
*/
private static long getTableMinVisibleVersionTime(OlapTable table) {
long result = Long.MAX_VALUE;
long visibleVersionTime;
for (Partition partition : table.getAllPartitions()) {
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
if (visibleVersionTime < result) {
result = visibleVersionTime;
}
}
return result;
}
/**
* Obtain the partition correspondence between materialized views and base tables
* Currently, there is a one-to-one correspondence between the partitions of materialized views and base tables,
* but for scalability reasons, Set is used
* <p>
* before use this method,should call `alignMvPartition`
*
* @param mtmv
* @param relatedTable
* @return mv.partitionId ==> relatedTable.partitionId
*/
private static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv, OlapTable relatedTable)
throws AnalysisException {
HashMap<Long, Set<Long>> res = Maps.newHashMap();
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false);
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false);
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
if (partitionId == -1L) {
throw new AnalysisException("partition not found: " + entry.getValue().toString());
}
res.put(entry.getKey(), Sets.newHashSet(partitionId));
}
return res;
}
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
* @param visibleVersionTime
* @param tables
* @param excludedTriggerTables
* @param gracePeriod
* @return
*/
private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) {
long maxAvailableTime = visibleVersionTime + gracePeriod;
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
try {
table = getTable(baseTableInfo);
} catch (AnalysisException e) {
e.printStackTrace();
return false;
}
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
if (!(table instanceof OlapTable)) {
continue;
}
long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table);
if (tableLastVisibleVersionTime > maxAvailableTime) {
return false;
}
}
return true;
}
}

View File

@ -30,6 +30,8 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
@ -89,6 +91,7 @@ import org.apache.doris.nereids.DorisParser.GroupingSetContext;
import org.apache.doris.nereids.DorisParser.HavingClauseContext;
import org.apache.doris.nereids.DorisParser.HintAssignmentContext;
import org.apache.doris.nereids.DorisParser.HintStatementContext;
import org.apache.doris.nereids.DorisParser.IdentifierContext;
import org.apache.doris.nereids.DorisParser.IdentifierListContext;
import org.apache.doris.nereids.DorisParser.IdentifierOrTextContext;
import org.apache.doris.nereids.DorisParser.IdentifierSeqContext;
@ -112,7 +115,6 @@ import org.apache.doris.nereids.DorisParser.LogicalNotContext;
import org.apache.doris.nereids.DorisParser.MapLiteralContext;
import org.apache.doris.nereids.DorisParser.MultiStatementsContext;
import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
import org.apache.doris.nereids.DorisParser.MvRefreshUnitContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
import org.apache.doris.nereids.DorisParser.NullLiteralContext;
@ -550,10 +552,25 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
comment,
desc, properties, logicalPlan, querySql,
new MTMVRefreshInfo(buildMode, refreshMethod, refreshTriggerInfo),
ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols)
ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols),
visitMTMVPartitionInfo(ctx.partitionKey)
));
}
/**
* get MTMVPartitionInfo
*
* @param ctx IdentifierContext
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) {
if (ctx == null) {
return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE);
} else {
return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE, ctx.getText());
}
}
@Override
public List<SimpleColumnDefinition> visitSimpleColumnDefs(SimpleColumnDefsContext ctx) {
if (ctx == null) {
@ -601,19 +618,32 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
int interval = Integer.parseInt(ctx.INTEGER_VALUE().getText());
String startTime = ctx.STARTS() == null ? null
: ctx.STRING_LITERAL().getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1);
IntervalUnit unit = visitMvRefreshUnit(ctx.mvRefreshUnit());
IntervalUnit unit = visitMvRefreshUnit(ctx.refreshUnit);
return new MTMVRefreshSchedule(startTime, interval, unit);
}
@Override
public IntervalUnit visitMvRefreshUnit(MvRefreshUnitContext ctx) {
return IntervalUnit.valueOf(ctx.getText().toUpperCase());
/**
* get IntervalUnit,only enable_job_schedule_second_for_test is true, can use second
*
* @param ctx ctx
* @return IntervalUnit
*/
public IntervalUnit visitMvRefreshUnit(IdentifierContext ctx) {
IntervalUnit intervalUnit = IntervalUnit.fromString(ctx.getText().toUpperCase());
if (null == intervalUnit) {
throw new AnalysisException("interval time unit can not be " + ctx.getText());
}
if (intervalUnit.equals(IntervalUnit.SECOND)
&& !Config.enable_job_schedule_second_for_test) {
throw new AnalysisException("interval time unit can not be second");
}
return intervalUnit;
}
@Override
public RefreshMethod visitRefreshMethod(RefreshMethodContext ctx) {
if (ctx == null) {
return RefreshMethod.COMPLETE;
return RefreshMethod.AUTO;
}
return RefreshMethod.valueOf(ctx.getText().toUpperCase());
}
@ -631,7 +661,19 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
@Override
public RefreshMTMVCommand visitRefreshMTMV(RefreshMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
return new RefreshMTMVCommand(new RefreshMTMVInfo(new TableNameInfo(nameParts)));
List<String> partitions = ImmutableList.of();
if (ctx.partitionSpec() != null) {
if (ctx.partitionSpec().TEMPORARY() != null) {
throw new AnalysisException("Not allowed to specify TEMPORARY ");
}
if (ctx.partitionSpec().partition != null) {
partitions = ImmutableList.of(ctx.partitionSpec().partition.getText());
} else {
partitions = visitIdentifierList(ctx.partitionSpec().partitions);
}
}
return new RefreshMTMVCommand(new RefreshMTMVInfo(new TableNameInfo(nameParts),
partitions, ctx.COMPLETE() != null));
}
@Override

View File

@ -82,7 +82,7 @@ public abstract class AbstractMaterializedViewRule {
queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) {
continue;
}
Plan mvPlan = materializationContext.getMtmv().getMvCache().getLogicalPlan();
Plan mvPlan = materializationContext.getMtmv().getCache().getLogicalPlan();
List<StructInfo> viewStructInfos = extractStructInfo(mvPlan, cascadesContext);
if (viewStructInfos.size() > 1) {
// view struct info should only have one

View File

@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCacheManager;
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.PlannerHook;
@ -70,10 +70,10 @@ public class InitMaterializationContextHook implements PlannerHook {
}
List<BaseTableInfo> baseTableUsed =
collectedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList());
// TODO the logic should be move to MTMVCacheManager later when getAvailableMaterializedView is ready in
// TODO the logic should be move to MTMVRelationManager later when getAvailableMaterializedView is ready in
// MV Cache manager
Env env = cascadesContext.getConnectContext().getEnv();
MTMVCacheManager cacheManager = env.getMtmvService().getCacheManager();
MTMVRelationManager cacheManager = env.getMtmvService().getRelationManager();
Set<BaseTableInfo> materializedViews = new HashSet<>();
for (BaseTableInfo baseTableInfo : baseTableUsed) {
Set<BaseTableInfo> mtmvsByBaseTable = cacheManager.getMtmvsByBaseTable(baseTableInfo);

View File

@ -19,7 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
import org.apache.doris.mtmv.MVCache;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupId;
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
@ -59,17 +59,17 @@ public class MaterializationContext {
this.mvScanPlan = mvScanPlan;
this.baseTables = baseTables;
this.baseViews = baseViews;
MVCache mvCache = mtmv.getMvCache();
MTMVCache mtmvCache = mtmv.getCache();
// TODO This logic should move to materialized view cache manager
if (mvCache == null) {
mvCache = MVCache.from(mtmv, cascadesContext.getConnectContext());
mtmv.setMvCache(mvCache);
if (mtmvCache == null) {
mtmvCache = mtmvCache.from(mtmv, cascadesContext.getConnectContext());
mtmv.setCache(mtmvCache);
}
// mv output expression shuttle, this will be used to expression rewrite
this.mvExprToMvScanExprMapping = ExpressionMapping.generate(
ExpressionUtils.shuttleExpressionWithLineage(
mvCache.getMvOutputExpressions(),
mvCache.getLogicalPlan()),
mtmvCache.getMvOutputExpressions(),
mtmvCache.getLogicalPlan()),
mvScanPlan.getExpressions());
}

View File

@ -31,7 +31,7 @@ import java.util.Objects;
/**
* alter multi table materialized view
*/
public class AlterMTMVCommand extends Command implements ForwardWithSync {
public class AlterMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
public static final Logger LOG = LogManager.getLogger(AlterMTMVCommand.class);
private final AlterMTMVInfo alterMTMVInfo;

View File

@ -32,7 +32,7 @@ import java.util.Objects;
/**
* create multi table materialized view
*/
public class CreateMTMVCommand extends Command implements ForwardWithSync {
public class CreateMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
public static final Logger LOG = LogManager.getLogger(CreateMTMVCommand.class);
private final CreateMTMVInfo createMTMVInfo;

View File

@ -29,7 +29,7 @@ import java.util.Objects;
/**
* refresh mtmv
*/
public class DropMTMVCommand extends Command implements ForwardWithSync {
public class DropMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
private final DropMTMVInfo dropMTMVInfo;
public DropMTMVCommand(DropMTMVInfo dropMTMVInfo) {

View File

@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
/**
* The class that implements this interface does not allow fallback to OriginalPlanner,
* for example, some new features are not implemented by the old parser
*/
public interface NotAllowFallback {
}

View File

@ -29,7 +29,7 @@ import java.util.Objects;
/**
* refresh mtmv
*/
public class RefreshMTMVCommand extends Command implements ForwardWithSync {
public class RefreshMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
private final RefreshMTMVInfo refreshMTMVInfo;
public RefreshMTMVCommand(RefreshMTMVInfo refreshMTMVInfo) {

View File

@ -17,12 +17,15 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.parser.NereidsParser;
@ -33,11 +36,13 @@ import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -62,38 +67,41 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
* Construct command
*
* @param mv materialize view
* @param partitions update partitions in mv and tables
* @param partitionIds update partitions in mv and tables
* @param tableWithPartKey the partitions key for different table
* @return command
*/
public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem> partitions,
public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds,
Map<OlapTable, String> tableWithPartKey) {
NereidsParser parser = new NereidsParser();
Map<OlapTable, Set<Expression>> predicates =
constructTableWithPredicates(partitions, tableWithPartKey);
List<String> parts = constructPartsForMv(mv, partitions);
constructTableWithPredicates(mv, partitionIds, tableWithPartKey);
List<String> parts = constructPartsForMv(mv, partitionIds);
Plan plan = parser.parseSingle(mv.getQuerySql());
plan = plan.accept(new PredicateAdder(), predicates);
if (plan instanceof Sink) {
plan = plan.child(0);
}
UnboundTableSink<? extends Plan> sink =
new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(),
parts, plan);
return new UpdateMvByPartitionCommand(sink);
}
private static List<String> constructPartsForMv(MTMV mv, Set<PartitionItem> partitions) {
return mv.getPartitionNames().stream()
.filter(name -> {
PartitionItem mvPartItem = mv.getPartitionInfo().getItem(mv.getPartition(name).getId());
return partitions.stream().anyMatch(p -> p.getIntersect(mvPartItem) != null);
})
private static List<String> constructPartsForMv(MTMV mv, Set<Long> partitionIds) {
return partitionIds.stream()
.map(id -> mv.getPartition(id).getName())
.collect(ImmutableList.toImmutableList());
}
private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(Set<PartitionItem> partitions,
Map<OlapTable, String> tableWithPartKey) {
private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(MTMV mv,
Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) {
Set<PartitionItem> items = partitionIds.stream()
.map(id -> mv.getPartitionInfo().getItem(id))
.collect(ImmutableSet.toImmutableSet());
ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new ImmutableMap.Builder<>();
tableWithPartKey.forEach((table, colName) ->
builder.put(table, constructPredicates(partitions, colName))
builder.put(table, constructPredicates(items, colName))
);
return builder.build();
}
@ -131,8 +139,15 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, Set<Expression>>> {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable, Set<Expression>> predicates) {
return new LogicalFilter<>(predicates.get(scan.getTable()), scan);
public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<OlapTable, Set<Expression>> predicates) {
List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(),
unboundRelation.getNameParts());
TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv());
if (table instanceof OlapTable && predicates.containsKey(table)) {
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
unboundRelation);
}
return unboundRelation;
}
}
}

View File

@ -19,28 +19,41 @@ package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.analysis.CreateMTMVStmt;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.ListPartitionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.mtmv.EnvInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
@ -56,6 +69,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -78,6 +92,9 @@ public class CreateMTMVInfo {
private final List<ColumnDefinition> columns = Lists.newArrayList();
private final List<SimpleColumnDefinition> simpleColumnDefinitions;
private final EnvInfo envInfo;
private final MTMVPartitionInfo mvPartitionInfo;
private PartitionDesc partitionDesc;
private MTMVRelation relation;
/**
* constructor for create MTMV
@ -87,7 +104,8 @@ public class CreateMTMVInfo {
DistributionDescriptor distribution, Map<String, String> properties,
LogicalPlan logicalQuery, String querySql,
MTMVRefreshInfo refreshInfo,
List<SimpleColumnDefinition> simpleColumnDefinitions) {
List<SimpleColumnDefinition> simpleColumnDefinitions,
MTMVPartitionInfo mvPartitionInfo) {
this.ifNotExists = Objects.requireNonNull(ifNotExists, "require ifNotExists object");
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
this.keys = Utils.copyRequiredList(keys);
@ -101,6 +119,8 @@ public class CreateMTMVInfo {
.requireNonNull(simpleColumnDefinitions, "require simpleColumnDefinitions object");
this.envInfo = new EnvInfo(ConnectContext.get().getCurrentCatalog().getId(),
ConnectContext.get().getCurrentDbId());
this.mvPartitionInfo = Objects
.requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo object");
}
/**
@ -154,6 +174,11 @@ public class CreateMTMVInfo {
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod);
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables);
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
}
}
/**
@ -163,22 +188,93 @@ public class CreateMTMVInfo {
// create table as select
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
if (plan.anyMatch(node -> node instanceof OneRowRelation)) {
throw new AnalysisException("at least contain one table");
}
// can not contain VIEW or MTMV
analyzeBaseTables(plan);
analyzeExpressions((PhysicalPlan) plan);
// can not contain Random function
analyzeExpressions(planner.getAnalyzedPlan());
// can not contain partition or tablets
boolean containTableQueryOperator = MaterializedViewUtils.containTableQueryOperator(planner.getAnalyzedPlan());
if (containTableQueryOperator) {
throw new AnalysisException("can not contain invalid expression");
}
getRelation(planner);
getColumns(plan);
analyzePartition(planner);
}
private void getRelation(NereidsPlanner planner) {
Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
}
private void analyzePartition(NereidsPlanner planner) {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan);
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable base table for partitioning");
}
TableIf followTable = null;
try {
followTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!(followTable instanceof OlapTable)) {
throw new AnalysisException("base table for partitioning only can be OlapTable.");
}
Set<String> partitionColumnNames;
try {
partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames();
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (partitionColumnNames.size() != 1) {
throw new AnalysisException("base table for partitioning only support single column.");
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
partitionDesc = generatePartitionDesc((OlapTable) followTable);
}
}
private PartitionDesc generatePartitionDesc(OlapTable relatedTable) {
PartitionType type = relatedTable.getPartitionInfo().getType();
try {
if (type == PartitionType.RANGE) {
return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
Lists.newArrayList());
} else if (type == PartitionType.LIST) {
return new ListPartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
Lists.newArrayList());
} else {
return null;
}
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException("can not generate partitionDesc", e);
}
}
private void analyzeBaseTables(Plan plan) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW));
new TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.VIEW));
plan.accept(TableCollector.INSTANCE, collectorContext);
List<TableIf> collectedTables = collectorContext.getCollectedTables();
if (!CollectionUtils.isEmpty(collectedTables)) {
throw new AnalysisException("can not contain MATERIALIZED_VIEW");
throw new AnalysisException("can not contain MATERIALIZED_VIEW or VIEW");
}
}
private void analyzeExpressions(PhysicalPlan plan) {
private void analyzeExpressions(Plan plan) {
List<TreeNode<Expression>> functionCollectResult = new ArrayList<>();
plan.accept(NondeterministicFunctionCollector.INSTANCE, functionCollectResult);
if (!CollectionUtils.isEmpty(functionCollectResult)) {
@ -225,7 +321,8 @@ public class CreateMTMVInfo {
.map(ColumnDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
return new CreateMTMVStmt(ifNotExists, tableName, catalogColumns, refreshInfo, keysDesc,
distribution.translateToCatalogStyle(), properties, mvProperties, querySql, comment, envInfo);
distribution.translateToCatalogStyle(), properties, mvProperties, querySql, comment, envInfo,
partitionDesc, mvPartitionInfo, relation);
}
}

View File

@ -17,12 +17,22 @@
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Objects;
/**
@ -30,13 +40,18 @@ import java.util.Objects;
*/
public class RefreshMTMVInfo {
private final TableNameInfo mvName;
private List<String> partitions;
private boolean isComplete;
public RefreshMTMVInfo(TableNameInfo mvName) {
public RefreshMTMVInfo(TableNameInfo mvName, List<String> partitions, boolean isComplete) {
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
this.partitions = Utils.copyRequiredList(partitions);
this.isComplete = Objects.requireNonNull(isComplete, "require isComplete object");
}
/**
* analyze refresh info
*
* @param ctx ConnectContext
*/
public void analyze(ConnectContext ctx) {
@ -48,13 +63,41 @@ public class RefreshMTMVInfo {
mvName.getDb() + ": " + mvName.getTbl());
throw new AnalysisException(message);
}
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
if (!CollectionUtils.isEmpty(partitions)) {
MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
}
} catch (org.apache.doris.common.AnalysisException | MetaNotFoundException | DdlException e) {
throw new AnalysisException(e.getMessage());
}
}
/**
* getMvName
*
* @return TableNameInfo
*/
public TableNameInfo getMvName() {
return mvName;
}
/**
* getPartitions
*
* @return partitionNames
*/
public List<String> getPartitions() {
return partitions;
}
/**
* isComplete
*
* @return isComplete
*/
public boolean isComplete() {
return isComplete;
}
}

View File

@ -127,6 +127,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapScanNode;
@ -439,6 +440,14 @@ public class StmtExecutor {
execute(queryId);
}
public boolean notAllowFallback() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
return logicalPlan instanceof NotAllowFallback;
}
return false;
}
public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
@ -457,6 +466,10 @@ public class StmtExecutor {
// try to fall back to legacy planner
LOG.debug("nereids cannot process statement\n" + originStmt.originStmt
+ "\n because of " + e.getMessage(), e);
if (notAllowFallback()) {
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}
boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter
&& ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
if (e instanceof NereidsException

View File

@ -32,6 +32,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
@ -537,6 +538,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(mv.getQuerySql()));
trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString()));
trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString()));
trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
trow.addToColumnValue(new TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
dataBatch.add(trow);
}
}

View File

@ -54,7 +54,9 @@ public class MvInfosTableValuedFunction extends MetadataTableValuedFunction {
new Column("RefreshInfo", ScalarType.createStringType()),
new Column("QuerySql", ScalarType.createStringType()),
new Column("EnvInfo", ScalarType.createStringType()),
new Column("MvProperties", ScalarType.createStringType()));
new Column("MvProperties", ScalarType.createStringType()),
new Column("MvPartitionInfo", ScalarType.createStringType()),
new Column("SyncWithBaseTables", ScalarType.createType(PrimitiveType.BOOLEAN)));
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;