[feature](mtmv)(1)remove old mtmv code (#26041)

remove old mtmv code,we will implement mtmv in a new way
This commit is contained in:
zhangdong
2023-10-30 19:49:45 +08:00
committed by GitHub
parent f97c40d0e1
commit 2a74d9a8c8
80 changed files with 35 additions and 6119 deletions

View File

@ -25,12 +25,10 @@ import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropPartitionFromIndexClause;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
import org.apache.doris.analysis.ModifyColumnCommentClause;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.ModifyEngineClause;
@ -38,7 +36,6 @@ import org.apache.doris.analysis.ModifyPartitionClause;
import org.apache.doris.analysis.ModifyTableCommentClause;
import org.apache.doris.analysis.ModifyTablePropertiesClause;
import org.apache.doris.analysis.PartitionRenameClause;
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.ReplaceTableClause;
import org.apache.doris.analysis.RollupRenameClause;
@ -48,7 +45,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
@ -66,9 +62,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.mtmv.MTMVJobFactory;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.persist.AlterMultiMaterializedView;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
@ -126,11 +119,6 @@ public class Alter {
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
}
public void processCreateMultiTableMaterializedView(CreateMultiTableMaterializedViewStmt stmt)
throws UserException {
((MaterializedViewHandler) materializedViewHandler).processCreateMultiTablesMaterializedView(stmt);
}
public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
if (!stmt.isForMTMV() && stmt.getTableName() == null) {
throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
@ -154,16 +142,6 @@ public class Alter {
}
}
public void processRefreshMaterializedView(RefreshMaterializedViewStmt stmt)
throws DdlException, MetaNotFoundException {
if (stmt.getRefreshMethod() != RefreshMethod.COMPLETE) {
throw new DdlException("Now only support REFRESH COMPLETE.");
}
String db = stmt.getMvName().getDb();
String tbl = stmt.getMvName().getTbl();
Env.getCurrentEnv().getMTMVJobManager().refreshMTMV(db, tbl);
}
private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List<AlterClause> alterClauses,
final String clusterName, Database db) throws UserException {
if (olapTable.getDataSortInfo() != null
@ -537,41 +515,6 @@ public class Alter {
}
}
public void processAlterMaterializedView(AlterMultiMaterializedView alterView, boolean isReplay)
throws UserException {
TableName tbl = alterView.getMvName();
MaterializedView olapTable = null;
try {
// 1. check mv exist
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb());
olapTable = (MaterializedView) db.getTableOrMetaException(tbl.getTbl(), TableType.MATERIALIZED_VIEW);
// 2. drop old job and kill the associated tasks
Env.getCurrentEnv().getMTMVJobManager().dropJobByName(tbl.getDb(), tbl.getTbl(), isReplay);
// 3. overwrite the refresh info in the memory of fe.
olapTable.writeLock();
olapTable.setRefreshInfo(alterView.getInfo());
// 4. log it and replay it in the follower
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logAlterMTMV(alterView);
// 5. master node generate new jobs
if (MTMVJobFactory.isGenerateJob(olapTable)) {
List<MTMVJob> jobs = MTMVJobFactory.buildJob(olapTable, db.getFullName());
for (MTMVJob job : jobs) {
Env.getCurrentEnv().getMTMVJobManager().createJob(job, false);
}
LOG.info("Alter mv success with new mv job created.");
}
}
} finally {
if (olapTable != null) {
olapTable.writeUnlock();
}
}
}
// entry of processing replace table
private void processReplaceTable(Database db, OlapTable origTable, List<AlterClause> alterClauses)
throws UserException {

View File

@ -23,7 +23,6 @@ import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropRollupClause;
import org.apache.doris.analysis.MVColumnItem;
@ -44,7 +43,6 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
@ -53,7 +51,6 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.PropertyAnalyzer;
@ -1292,15 +1289,4 @@ public class MaterializedViewHandler extends AlterHandler {
public Map<Long, Set<Long>> getTableRunningJobMap() {
return tableRunningJobMap;
}
public void processCreateMultiTablesMaterializedView(CreateMultiTableMaterializedViewStmt addMVClause)
throws UserException {
Map<String, TableIf> olapTables = addMVClause.getTables();
try {
olapTables.values().forEach(TableIf::readLock);
Env.getCurrentEnv().createTable(addMVClause);
} finally {
olapTables.values().forEach(TableIf::readUnlock);
}
}
}

View File

@ -1,65 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
public class AlterMaterializedViewStmt extends DdlStmt {
private TableName mvName;
private MVRefreshInfo info;
public AlterMaterializedViewStmt(TableName mvName, MVRefreshInfo info) {
this.mvName = mvName;
this.info = info;
}
public TableName getTable() {
return mvName;
}
public MVRefreshInfo getRefreshInfo() {
return info;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
mvName.analyze(analyzer);
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(), mvName.getTbl(),
PrivPredicate.ALTER)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "MATERIALIZED VIEW",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
mvName.getDb() + ": " + mvName.getTbl());
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ALTER MATERIALIZED VIEW ").append(mvName.toSql()).append(" ").append(info.toString());
return sb.toString();
}
}

View File

@ -1,135 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* This class is used for {@link org.apache.doris.catalog.MaterializedView}.
* When we create a materialized view for multiple tables, we can specify the partition of the view
* which is as consistent as one of base tables.
*/
public class ColumnPartitionDesc extends PartitionDesc {
private final List<SlotRef> columns;
public ColumnPartitionDesc(List<SlotRef> columns)
throws AnalysisException {
this.columns = columns;
}
public void analyze(Analyzer analyzer, CreateMultiTableMaterializedViewStmt stmt) throws AnalysisException {
for (SlotRef column : columns) {
column.analyze(analyzer);
}
Table olapTable = matchTable(stmt.getTables());
PartitionDesc partitionDesc = ((OlapTable) olapTable).getPartitionInfo().toPartitionDesc((OlapTable) olapTable);
type = partitionDesc.getType();
partitionColNames = toMVPartitionColumnNames(olapTable.getName(), partitionDesc.getPartitionColNames(),
stmt.getQueryStmt());
singlePartitionDescs = partitionDesc.getSinglePartitionDescs();
}
private Table matchTable(Map<String, TableIf> olapTables) throws AnalysisException {
Table matched = null;
for (SlotRef column : columns) {
TableIf table = olapTables.get(column.getDesc().getParent().getTable().getName());
if (table != null) {
if (!(table instanceof OlapTable)) {
throw new AnalysisException(
"Can not get the partition information from a table whose type isn't OLAP.");
}
if (matched != null && !matched.getName().equals(table.getName())) {
throw new AnalysisException("The partition columns must be in the same table.");
} else if (matched == null) {
matched = (Table) table;
}
}
}
if (matched == null) {
throw new AnalysisException("The partition columns doesn't match the ones in base table.");
}
PartitionInfo partitionInfo = ((OlapTable) matched).getPartitionInfo();
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
if (!columns.stream().map(SlotRef::getColumn).collect(Collectors.toList()).equals(partitionColumns)) {
throw new AnalysisException("The partition columns doesn't match the ones in base table "
+ matched.getName() + ".");
}
return matched;
}
private List<String> toMVPartitionColumnNames(String tableName, List<String> partitionColNames, QueryStmt queryStmt)
throws AnalysisException {
List<String> mvPartitionColumnNames = Lists.newArrayListWithCapacity(partitionColNames.size());
if (queryStmt instanceof SelectStmt) {
List<SelectListItem> items = ((SelectStmt) queryStmt).getSelectList().getItems();
for (String partitionColName : partitionColNames) {
String mvColumnName = null;
for (int i = 0; mvColumnName == null && i < items.size(); ++i) {
SelectListItem item = items.get(i);
if (item.isStar()) {
mvColumnName = partitionColName;
} else if (item.getExpr() instanceof SlotRef) {
SlotRef slotRef = (SlotRef) item.getExpr();
if (slotRef.getTableName().getTbl().equals(tableName) && slotRef.getColumnName()
.equals(partitionColName)) {
mvColumnName = item.getAlias() == null ? partitionColName : item.getAlias();
}
}
}
if (mvColumnName != null) {
mvPartitionColumnNames.add(mvColumnName);
} else {
throw new AnalysisException(
"Failed to map the partition column name " + partitionColName + " to mv column");
}
}
} else {
throw new AnalysisException("Only select statement is supported.");
}
return mvPartitionColumnNames;
}
@Override
public PartitionInfo toPartitionInfo(List<Column> schema, Map<String, Long> partitionNameToId, boolean isTemp)
throws DdlException, AnalysisException {
switch (type) {
case RANGE:
return new RangePartitionDesc(partitionColNames,
singlePartitionDescs.stream().map(desc -> (AllPartitionDesc) desc)
.collect(Collectors.toList())).toPartitionInfo(schema, partitionNameToId, isTemp);
case LIST:
return new ListPartitionDesc(partitionColNames,
singlePartitionDescs.stream().map(desc -> (AllPartitionDesc) desc)
.collect(Collectors.toList())).toPartitionInfo(schema, partitionNameToId, isTemp);
default:
throw new RuntimeException("Invalid partition type.");
}
}
}

View File

@ -1,232 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
private final String mvName;
private final BuildMode buildMode;
private final MVRefreshInfo refreshInfo;
private final QueryStmt queryStmt;
private final Map<String, TableIf> tables = Maps.newHashMap();
public CreateMultiTableMaterializedViewStmt(String mvName, BuildMode buildMode,
MVRefreshInfo refreshInfo, KeysDesc keyDesc, PartitionDesc partitionDesc, DistributionDesc distributionDesc,
Map<String, String> properties, QueryStmt queryStmt) {
this.mvName = mvName;
this.buildMode = buildMode;
this.refreshInfo = refreshInfo;
this.queryStmt = queryStmt;
this.keysDesc = keyDesc;
this.partitionDesc = partitionDesc;
this.distributionDesc = distributionDesc;
this.properties = properties;
engineName = DEFAULT_ENGINE_NAME;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (!Config.enable_mtmv) {
throw new UserException("Multi table materialized view was not graduated."
+ " You should set `enable_mtmv = true` in fe to enabled it manually.");
}
refreshInfo.analyze(analyzer);
queryStmt.setNeedToSql(true);
queryStmt.setToSQLWithHint(true);
queryStmt.analyze(analyzer);
if (queryStmt instanceof SelectStmt) {
analyzeSelectClause((SelectStmt) queryStmt);
}
String defaultDb = analyzer.getDefaultDb();
if (Strings.isNullOrEmpty(defaultDb)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, defaultDb, mvName);
if (partitionDesc != null) {
((ColumnPartitionDesc) partitionDesc).analyze(analyzer, this);
}
super.analyze(analyzer);
}
private void analyzeSelectClause(SelectStmt selectStmt) throws AnalysisException {
for (TableRef tableRef : selectStmt.getTableRefs()) {
TableIf table = null;
if (tableRef instanceof BaseTableRef) {
String dbName = tableRef.getName().getDb();
String ctlName = tableRef.getName().getCtl();
CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
if (catalogIf == null) {
throw new AnalysisException("Failed to get the catalog: " + ctlName);
}
Optional<DatabaseIf> dbIf = catalogIf.getDb(dbName);
if (!dbIf.isPresent()) {
throw new AnalysisException("Failed to get the database: " + dbName);
}
Optional<TableIf> tableIf = dbIf.get().getTable(tableRef.getName().getTbl());
if (!tableIf.isPresent()) {
throw new AnalysisException("Failed to get the table: " + tableRef.getName().getTbl());
}
table = tableIf.orElse(null);
} else if (tableRef instanceof InlineViewRef) {
InlineViewRef inlineViewRef = (InlineViewRef) tableRef;
table = inlineViewRef.getDesc().getTable();
}
if (table == null) {
throw new AnalysisException("Failed to get the table by " + tableRef);
}
tables.put(table.getName(), table);
tables.put(tableRef.getAlias(), table);
}
checkSelectListItems(selectStmt.getSelectList().getItems());
columnDefs = generateColumnDefinitions(selectStmt);
}
private void checkSelectListItems(List<SelectListItem> items) throws AnalysisException {
for (SelectListItem item : items) {
if (item.isStar()) {
continue;
}
Expr itemExpr = item.getExpr();
String alias = item.getAlias();
if (itemExpr instanceof SlotRef) {
continue;
} else if (itemExpr instanceof FunctionCallExpr && ((FunctionCallExpr) itemExpr).isAggregateFunction()) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) itemExpr;
String functionName = functionCallExpr.getFnName().getFunction();
MVColumnPattern mvColumnPattern = CreateMaterializedViewStmt.FN_NAME_TO_PATTERN
.get(functionName.toLowerCase());
if (mvColumnPattern == null) {
throw new AnalysisException(
"Materialized view does not support this function:" + functionCallExpr.toSqlImpl());
}
if (!mvColumnPattern.match(functionCallExpr)) {
throw new AnalysisException(
"The function " + functionName + " must match pattern:" + mvColumnPattern);
}
if (StringUtils.isEmpty(alias)) {
throw new AnalysisException("Function expr: " + functionName + " must have a alias name for MTMV.");
}
} else {
throw new AnalysisException(
"Materialized view does not support this expr:" + itemExpr.toSqlImpl());
}
}
}
private List<ColumnDef> generateColumnDefinitions(SelectStmt selectStmt) throws AnalysisException {
List<Column> schema = generateSchema(selectStmt);
return schema.stream()
.map(column -> new ColumnDef(
column.getName(),
new TypeDef(column.getType()),
column.isKey(),
null,
column.isAllowNull(),
column.isAutoInc(),
new DefaultValue(column.getDefaultValue() != null, column.getDefaultValue()),
column.getComment())
).collect(Collectors.toList());
}
private List<Column> generateSchema(SelectStmt selectStmt) throws AnalysisException {
ArrayList<Expr> resultExprs = selectStmt.getResultExprs();
ArrayList<String> colLabels = selectStmt.getColLabels();
Map<String, Column> uniqueMVColumnItems = Maps.newLinkedHashMap();
for (int i = 0; i < resultExprs.size(); i++) {
Column column = generateMTMVColumn(resultExprs.get(i), colLabels.get(i));
if (uniqueMVColumnItems.put(column.getName(), column) != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, column.getName());
}
}
return Lists.newArrayList(uniqueMVColumnItems.values());
}
private Column generateMTMVColumn(Expr expr, String colLabel) {
return new Column(colLabel.toLowerCase(), expr.getType(), true);
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("CREATE MATERIALIZED VIEW ").append(mvName).append(" BUILD ").append(buildMode.toString());
if (refreshInfo != null) {
sb.append(" ").append(refreshInfo);
}
if (partitionDesc != null) {
sb.append(" ").append(partitionDesc);
}
if (distributionDesc != null) {
sb.append(" ").append(distributionDesc);
}
if (properties != null && !properties.isEmpty()) {
sb.append("\nPROPERTIES (");
sb.append(new PrintableMap<>(properties, " = ", true, true, true));
sb.append(")");
}
sb.append(" AS ").append(queryStmt.toSql());
return sb.toString();
}
public String getMVName() {
return mvName;
}
public Map<String, TableIf> getTables() {
return tables;
}
public MVRefreshInfo getRefreshInfo() {
return refreshInfo;
}
public QueryStmt getQueryStmt() {
return queryStmt;
}
public BuildMode getBuildMode() {
return buildMode;
}
}

View File

@ -559,8 +559,7 @@ public class CreateTableStmt extends DdlStmt {
tableName.getCtl(), tableName.getDb(), properties);
// analyze partition
if (partitionDesc != null) {
if (partitionDesc instanceof ListPartitionDesc || partitionDesc instanceof RangePartitionDesc
|| partitionDesc instanceof ColumnPartitionDesc) {
if (partitionDesc instanceof ListPartitionDesc || partitionDesc instanceof RangePartitionDesc) {
partitionDesc.analyze(columnDefs, properties);
} else {
throw new AnalysisException("Currently only support range"

View File

@ -1,91 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.UserException;
import com.google.gson.annotations.SerializedName;
public class MVRefreshInfo {
@SerializedName("neverRefresh")
private boolean neverRefresh;
@SerializedName("refreshMethod")
private RefreshMethod refreshMethod;
@SerializedName("triggerInfo")
private MVRefreshTriggerInfo triggerInfo;
// For deserialization
public MVRefreshInfo() {}
public MVRefreshInfo(boolean neverRefresh) {
this(neverRefresh, RefreshMethod.COMPLETE, null);
}
public MVRefreshInfo(RefreshMethod method, MVRefreshTriggerInfo trigger) {
this(trigger == null, method, trigger);
}
public MVRefreshInfo(boolean neverRefresh, RefreshMethod method, MVRefreshTriggerInfo trigger) {
this.neverRefresh = neverRefresh;
refreshMethod = method;
triggerInfo = trigger;
}
void analyze(Analyzer analyzer) throws UserException {
if (triggerInfo != null) {
triggerInfo.analyze(analyzer);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (neverRefresh) {
sb.append(" NEVER REFRESH ");
} else {
sb.append(" REFRESH ");
sb.append(refreshMethod.toString());
sb.append(triggerInfo.toString());
}
return sb.toString();
}
public boolean isNeverRefresh() {
return neverRefresh;
}
public RefreshMethod getRefreshMethod() {
return refreshMethod;
}
public MVRefreshTriggerInfo getTriggerInfo() {
return triggerInfo;
}
public enum RefreshMethod {
COMPLETE, FAST, FORCE
}
public enum BuildMode {
IMMEDIATE, DEFERRED
}
public enum RefreshTrigger {
DEMAND, COMMIT, INTERVAL
}
}

View File

@ -1,63 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import com.google.gson.annotations.SerializedName;
public class MVRefreshIntervalTriggerInfo {
@SerializedName("startTime")
private String startTime;
@SerializedName("interval")
private long interval;
@SerializedName("timeUnit")
private String timeUnit;
// For deserialization
public MVRefreshIntervalTriggerInfo() {
}
public MVRefreshIntervalTriggerInfo(String startTime, long interval, String timeUnit) {
this.startTime = startTime;
this.interval = interval;
this.timeUnit = timeUnit;
}
public String getStartTime() {
return startTime;
}
public long getInterval() {
return interval;
}
public String getTimeUnit() {
return timeUnit;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (startTime != null) {
sb.append(" START WITH \"").append(startTime).append("\"");
}
if (interval > 0) {
sb.append(" NEXT ").append(interval).append(" ").append(timeUnit);
}
return sb.toString();
}
}

View File

@ -1,77 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.MVRefreshInfo.RefreshTrigger;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.gson.annotations.SerializedName;
public class MVRefreshTriggerInfo {
@SerializedName("refreshTrigger")
private RefreshTrigger refreshTrigger;
@SerializedName("intervalTrigger")
private MVRefreshIntervalTriggerInfo intervalTrigger;
// For deserialization
public MVRefreshTriggerInfo() {}
public MVRefreshTriggerInfo(RefreshTrigger trigger) {
this(trigger, null);
}
public MVRefreshTriggerInfo(MVRefreshIntervalTriggerInfo trigger) {
this(RefreshTrigger.INTERVAL, trigger);
}
public MVRefreshTriggerInfo(RefreshTrigger refreshTrigger, MVRefreshIntervalTriggerInfo intervalTrigger) {
this.refreshTrigger = refreshTrigger;
this.intervalTrigger = intervalTrigger;
}
void analyze(Analyzer analyzer) throws UserException {
if (refreshTrigger == RefreshTrigger.INTERVAL && (intervalTrigger == null || (
intervalTrigger.getStartTime() == null && intervalTrigger.getInterval() < 0))) {
throw new AnalysisException("Start time or interval is required.");
} else if (refreshTrigger == null) {
throw new AnalysisException("refresh trigger is required.");
}
}
public RefreshTrigger getRefreshTrigger() {
return refreshTrigger;
}
public MVRefreshIntervalTriggerInfo getIntervalTrigger() {
return intervalTrigger;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (refreshTrigger != RefreshTrigger.INTERVAL) {
sb.append(" ON ");
sb.append(refreshTrigger.toString());
} else {
sb.append(intervalTrigger.toString());
}
return sb.toString();
}
}

View File

@ -1,76 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
/**
* REFRESH MATERIALIZED VIEW [db_name].<mv_name> [complete|fast];
*
* Parameters
* [complete|fast]: complete or incremental refresh.
* mv_name: The name of the materialized view to refresh.
*/
public class RefreshMaterializedViewStmt extends DdlStmt {
private TableName mvName;
private RefreshMethod refreshMethod;
public RefreshMaterializedViewStmt(TableName mvName, RefreshMethod refreshMethod) {
this.mvName = mvName;
if (refreshMethod == null) {
this.refreshMethod = RefreshMethod.COMPLETE;
}
this.refreshMethod = refreshMethod;
}
public TableName getMvName() {
return mvName;
}
public RefreshMethod getRefreshMethod() {
return refreshMethod;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
mvName.analyze(analyzer);
// check access
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(),
mvName.getTbl(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
@Override
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("REFRESH MATERIALIZED VIEW ")
.append(mvName.toSql())
.append(" ")
.append(refreshMethod.toString());
return stringBuilder.toString();
}
}

View File

@ -1,143 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.UserException;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
public class ShowMTMVJobStmt extends ShowStmt {
private final String jobName; // optional
private final String dbName; // optional
private final TableName mvName; // optional
private String analyzedDbName;
public ShowMTMVJobStmt() {
this.jobName = null;
this.dbName = null;
this.mvName = null;
}
public ShowMTMVJobStmt(String jobName) {
this.jobName = jobName;
this.dbName = null;
this.mvName = null;
}
public ShowMTMVJobStmt(String dbName, TableName mvName) {
this.jobName = null;
this.dbName = dbName;
this.mvName = mvName;
}
public boolean isShowAllJobs() {
return analyzedDbName == null && mvName == null && jobName == null;
}
public boolean isShowAllJobsFromDb() {
return analyzedDbName != null && mvName == null;
}
public boolean isShowAllJobsOnMv() {
return mvName != null;
}
public boolean isSpecificJob() {
return jobName != null;
}
public String getDbName() {
return analyzedDbName;
}
public String getMVName() {
return mvName != null ? mvName.getTbl() : null;
}
public String getJobName() {
return jobName;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (dbName != null && mvName != null && mvName.getDb() != null && !dbName.equals(mvName.getDb())) {
throw new UserException("Database name should be same when they both been set.");
}
analyzedDbName = dbName;
if (Strings.isNullOrEmpty(analyzedDbName)) {
if (mvName != null) {
analyzedDbName = mvName.getDb();
}
if (Strings.isNullOrEmpty(analyzedDbName)) {
analyzedDbName = analyzer.getDefaultDb();
}
}
if (!Strings.isNullOrEmpty(analyzedDbName)) {
analyzedDbName = ClusterNamespace.getFullName(getClusterName(), analyzedDbName);
}
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : MTMVJob.SHOW_TITLE_NAMES) {
if (title.equals("Query")) {
builder.addColumn(new Column(title, ScalarType.createVarchar(10240)));
} else {
builder.addColumn(new Column(title, ScalarType.createVarchar(1024)));
}
}
return builder.build();
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("SHOW MTMV JOB");
if (!Strings.isNullOrEmpty(jobName)) {
sb.append(" FOR ");
sb.append(getJobName());
}
if (!Strings.isNullOrEmpty(dbName)) {
sb.append(" FROM ");
sb.append(ClusterNamespace.getNameFromFullName(dbName));
}
if (mvName != null) {
sb.append(" ON ");
sb.append(mvName.toSql());
}
return sb.toString();
}
@Override
public String toString() {
return toSql();
}
}

View File

@ -1,142 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.UserException;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
public class ShowMTMVTaskStmt extends ShowStmt {
private final String taskId; // optional
private final String dbName; // optional
private final TableName mvName; // optional
private String analyzedDbName;
public ShowMTMVTaskStmt() {
this.taskId = null;
this.dbName = null;
this.mvName = null;
}
public ShowMTMVTaskStmt(String taskId) {
this.taskId = taskId;
this.dbName = null;
this.mvName = null;
}
public ShowMTMVTaskStmt(String dbName, TableName mvName) {
this.taskId = null;
this.dbName = dbName;
this.mvName = mvName;
}
public boolean isShowAllTasks() {
return analyzedDbName == null && mvName == null && taskId == null;
}
public boolean isShowAllTasksFromDb() {
return analyzedDbName != null && mvName == null;
}
public boolean isShowAllTasksOnMv() {
return mvName != null;
}
public boolean isSpecificTask() {
return taskId != null;
}
public String getDbName() {
return analyzedDbName;
}
public String getMVName() {
return mvName != null ? mvName.getTbl() : null;
}
public String getTaskId() {
return taskId;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (dbName != null && mvName != null && mvName.getDb() != null && !dbName.equals(mvName.getDb())) {
throw new UserException("Database name should be same when they both been set.");
}
analyzedDbName = dbName;
if (Strings.isNullOrEmpty(analyzedDbName)) {
if (mvName != null) {
analyzedDbName = mvName.getDb();
}
if (Strings.isNullOrEmpty(analyzedDbName)) {
analyzedDbName = analyzer.getDefaultDb();
}
}
if (!Strings.isNullOrEmpty(analyzedDbName)) {
analyzedDbName = ClusterNamespace.getFullName(getClusterName(), analyzedDbName);
}
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : MTMVTask.SHOW_TITLE_NAMES) {
if (title.equals("Query") || title.equals("ErrorMessage")) {
builder.addColumn(new Column(title, ScalarType.createVarchar(10240)));
} else {
builder.addColumn(new Column(title, ScalarType.createVarchar(1024)));
}
}
return builder.build();
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("SHOW MTMV TASK");
if (taskId != null) {
sb.append(" FOR ");
sb.append(getTaskId());
}
if (dbName != null) {
sb.append(" FROM ");
sb.append(ClusterNamespace.getNameFromFullName(dbName));
}
if (mvName != null) {
sb.append(" ON ");
sb.append(mvName.toSql());
}
return sb.toString();
}
@Override
public String toString() {
return toSql();
}
}

View File

@ -38,7 +38,6 @@ import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterMaterializedViewStmt;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
@ -50,7 +49,6 @@ import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.CreateTableLikeStmt;
import org.apache.doris.analysis.CreateTableStmt;
@ -70,7 +68,6 @@ import org.apache.doris.analysis.PartitionRenameClause;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
@ -175,11 +172,9 @@ import org.apache.doris.master.MetaHelper;
import org.apache.doris.master.PartitionInMemoryInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterMultiMaterializedView;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
@ -473,8 +468,6 @@ public class Env {
private PolicyMgr policyMgr;
private MTMVJobManager mtmvJobManager;
private AnalysisManager analysisManager;
private ExternalMetaCacheMgr extMetaCacheMgr;
@ -580,10 +573,6 @@ public class Env {
return catalogMgr;
}
public MTMVJobManager getMTMVJobManager() {
return mtmvJobManager;
}
public ExternalMetaCacheMgr getExtMetaCacheMgr() {
return extMetaCacheMgr;
}
@ -725,7 +714,6 @@ public class Env {
this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
this.refreshManager = new RefreshManager();
this.policyMgr = new PolicyMgr();
this.mtmvJobManager = new MTMVJobManager();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
@ -1583,8 +1571,6 @@ public class Env {
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}
// start mtmv jobManager
mtmvJobManager.start();
getRefreshManager().start();
// binlog gcer
@ -1635,8 +1621,6 @@ public class Env {
MetricRepo.init();
// stop mtmv scheduler
mtmvJobManager.stop();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
@ -2102,15 +2086,6 @@ public class Env {
return checksum;
}
/**
* Load mtmv jobManager.
**/
public long loadMTMVJobManager(DataInputStream in, long checksum) throws IOException {
this.mtmvJobManager = MTMVJobManager.read(in, checksum);
LOG.info("finished replay mtmv job and tasks from image");
return checksum;
}
/**
* Load global function.
**/
@ -2362,12 +2337,6 @@ public class Env {
return checksum;
}
public long saveMTMVJobManager(CountingDataOutputStream out, long checksum) throws IOException {
Env.getCurrentEnv().getMTMVJobManager().write(out, checksum);
LOG.info("Save mtmv job and tasks to image");
return checksum;
}
public long saveGlobalFunction(CountingDataOutputStream out, long checksum) throws IOException {
this.globalFunctionMgr.write(out);
LOG.info("Save global function to image");
@ -3032,10 +3001,6 @@ public class Env {
}
sb.append("\n) ENGINE=");
sb.append(table.getType().name());
} else {
MaterializedView materializedView = ((MaterializedView) table);
sb.append("\n").append("BUILD ").append(materializedView.getBuildMode())
.append(materializedView.getRefreshInfo().toString());
}
if (table.getType() == TableType.OLAP || table.getType() == TableType.MATERIALIZED_VIEW) {
@ -3434,10 +3399,6 @@ public class Env {
sb.append("\n)");
}
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("\nAS ").append(((MaterializedView) table).getQuery());
}
createTableStmt.add(sb + ";");
// 2. add partition
@ -4060,11 +4021,6 @@ public class Env {
this.alter.processAlterTable(stmt);
}
public void alterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException {
AlterMultiMaterializedView alter = new AlterMultiMaterializedView(stmt.getTable(), stmt.getRefreshInfo());
this.alter.processAlterMaterializedView(alter, false);
}
/**
* used for handling AlterViewStmt (the ALTER VIEW command).
*/
@ -4077,18 +4033,10 @@ public class Env {
this.alter.processCreateMaterializedView(stmt);
}
public void createMultiTableMaterializedView(CreateMultiTableMaterializedViewStmt stmt) throws UserException {
this.alter.processCreateMultiTableMaterializedView(stmt);
}
public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
this.alter.processDropMaterializedView(stmt);
}
public void refreshMaterializedView(RefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
this.alter.processRefreshMaterializedView(stmt);
}
/*
* used for handling CancelAlterStmt (for client is the CANCEL ALTER
* command). including SchemaChangeHandler and RollupHandler

View File

@ -1,129 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.MVRefreshInfo;
import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
import org.apache.doris.catalog.OlapTableFactory.MaterializedViewParams;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class MaterializedView extends OlapTable {
@SerializedName("buildMode")
private BuildMode buildMode;
@SerializedName("refreshInfo")
private MVRefreshInfo refreshInfo;
@SerializedName("query")
private String query;
private final ReentrantLock mvTaskLock = new ReentrantLock(true);
public boolean tryLockMVTask() {
try {
return mvTaskLock.tryLock(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return false;
}
}
public void unLockMVTask() {
this.mvTaskLock.unlock();
}
// For deserialization
public MaterializedView() {
type = TableType.MATERIALIZED_VIEW;
}
MaterializedView(MaterializedViewParams params) {
super(
params.tableId,
params.tableName,
params.schema,
params.keysType,
params.partitionInfo,
params.distributionInfo
);
type = TableType.MATERIALIZED_VIEW;
buildMode = params.buildMode;
refreshInfo = params.mvRefreshInfo;
query = params.queryStmt.toSqlWithHint();
}
public BuildMode getBuildMode() {
return buildMode;
}
public MVRefreshInfo getRefreshInfo() {
return refreshInfo;
}
public void setRefreshInfo(MVRefreshInfo info) {
refreshInfo = info;
}
public String getQuery() {
return query;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
MaterializedView materializedView = GsonUtils.GSON.fromJson(Text.readString(in), this.getClass());
refreshInfo = materializedView.refreshInfo;
query = materializedView.query;
buildMode = materializedView.buildMode;
}
public MaterializedView clone(String mvName) throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeConstants.meta_version);
metaContext.setThreadLocalInfo();
try {
ByteArrayOutputStream out = new ByteArrayOutputStream(256);
MaterializedView cloned = new MaterializedView();
this.write(new DataOutputStream(out));
cloned.readFields(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
cloned.setName(mvName);
return cloned;
} finally {
MetaContext.remove();
}
}
}

View File

@ -17,12 +17,8 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.analysis.MVRefreshInfo;
import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.catalog.TableIf.TableType;
import com.google.common.base.Preconditions;
@ -44,18 +40,10 @@ public class OlapTableFactory {
public TableIndexes indexes;
}
public static class MaterializedViewParams extends BuildParams {
public MVRefreshInfo.BuildMode buildMode;
public MVRefreshInfo mvRefreshInfo;
public QueryStmt queryStmt;
}
private BuildParams params;
public static TableType getTableType(DdlStmt stmt) {
if (stmt instanceof CreateMultiTableMaterializedViewStmt) {
return TableType.MATERIALIZED_VIEW;
} else if (stmt instanceof CreateTableStmt) {
if (stmt instanceof CreateTableStmt) {
return TableType.OLAP;
} else {
throw new IllegalArgumentException("Invalid DDL statement: " + stmt.toSql());
@ -63,32 +51,22 @@ public class OlapTableFactory {
}
public OlapTableFactory init(TableType type) {
params = (type == TableType.OLAP) ? new OlapTableParams() : new MaterializedViewParams();
params = new OlapTableParams();
return this;
}
public Table build() {
Preconditions.checkNotNull(params, "The factory isn't initialized.");
if (params instanceof OlapTableParams) {
OlapTableParams olapTableParams = (OlapTableParams) params;
return new OlapTable(
olapTableParams.tableId,
olapTableParams.tableName,
olapTableParams.schema,
olapTableParams.keysType,
olapTableParams.partitionInfo,
olapTableParams.distributionInfo,
olapTableParams.indexes
);
} else {
MaterializedViewParams materializedViewParams = (MaterializedViewParams) params;
return new MaterializedView(materializedViewParams);
}
}
public BuildParams getBuildParams() {
return params;
OlapTableParams olapTableParams = (OlapTableParams) params;
return new OlapTable(
olapTableParams.tableId,
olapTableParams.tableName,
olapTableParams.schema,
olapTableParams.keysType,
olapTableParams.partitionInfo,
olapTableParams.distributionInfo,
olapTableParams.indexes
);
}
public OlapTableFactory withTableId(long tableId) {
@ -129,38 +107,8 @@ public class OlapTableFactory {
return this;
}
public OlapTableFactory withQueryStmt(QueryStmt queryStmt) {
Preconditions.checkState(params instanceof MaterializedViewParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MaterializedViewParams materializedViewParams = (MaterializedViewParams) params;
materializedViewParams.queryStmt = queryStmt;
return this;
}
private OlapTableFactory withBuildMode(BuildMode buildMode) {
MaterializedViewParams materializedViewParams = (MaterializedViewParams) params;
materializedViewParams.buildMode = buildMode;
return this;
}
public OlapTableFactory withRefreshInfo(MVRefreshInfo mvRefreshInfo) {
Preconditions.checkState(params instanceof MaterializedViewParams, "Invalid argument for "
+ params.getClass().getSimpleName());
MaterializedViewParams materializedViewParams = (MaterializedViewParams) params;
materializedViewParams.mvRefreshInfo = mvRefreshInfo;
return this;
}
public OlapTableFactory withExtraParams(DdlStmt stmt) {
boolean isMaterializedView = stmt instanceof CreateMultiTableMaterializedViewStmt;
if (!isMaterializedView) {
CreateTableStmt createOlapTableStmt = (CreateTableStmt) stmt;
return withIndexes(new TableIndexes(createOlapTableStmt.getIndexes()));
} else {
CreateMultiTableMaterializedViewStmt createMVStmt = (CreateMultiTableMaterializedViewStmt) stmt;
return withBuildMode(createMVStmt.getBuildMode())
.withRefreshInfo(createMVStmt.getRefreshInfo())
.withQueryStmt(createMVStmt.getQueryStmt());
}
CreateTableStmt createOlapTableStmt = (CreateTableStmt) stmt;
return withIndexes(new TableIndexes(createOlapTableStmt.getIndexes()));
}
}

View File

@ -363,8 +363,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
TableType type = TableType.valueOf(Text.readString(in));
if (type == TableType.OLAP) {
table = new OlapTable();
} else if (type == TableType.MATERIALIZED_VIEW) {
table = new MaterializedView();
} else if (type == TableType.ODBC) {
table = new OdbcTable();
} else if (type == TableType.MYSQL) {

View File

@ -79,7 +79,6 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.MysqlDb;
@ -137,8 +136,6 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.external.elasticsearch.EsRepository;
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
import org.apache.doris.mtmv.MTMVJobFactory;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.ColocatePersistInfo;
@ -888,7 +885,7 @@ public class InternalCatalog implements CatalogIf<Database> {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "VIEW");
}
} else {
if (table instanceof View || (!stmt.isMaterializedView() && table instanceof MaterializedView)) {
if (table instanceof View) {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_OBJECT, dbName, tableName, "TABLE");
}
}
@ -953,13 +950,6 @@ public class InternalCatalog implements CatalogIf<Database> {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, isReplay);
}
}
if (table instanceof MaterializedView) {
List<Long> dropIds = Env.getCurrentEnv().getMTMVJobManager().showJobs(db.getFullName(), table.getName())
.stream().map(MTMVJob::getId).collect(Collectors.toList());
Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, isReplay);
LOG.info("Drop related {} mv job.", dropIds.size());
}
LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
return true;
}
@ -2601,14 +2591,6 @@ public class InternalCatalog implements CatalogIf<Database> {
throw e;
}
if (olapTable instanceof MaterializedView && MTMVJobFactory.isGenerateJob((MaterializedView) olapTable)) {
List<MTMVJob> jobs = MTMVJobFactory.buildJob((MaterializedView) olapTable, db.getFullName());
for (MTMVJob job : jobs) {
Env.getCurrentEnv().getMTMVJobManager().createJob(job, false);
}
LOG.info("Create related {} mv job.", jobs.size());
}
}
private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {

View File

@ -55,15 +55,9 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.persist.AlterMultiMaterializedView;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.AlterUserOperationLog;
import org.apache.doris.persist.AlterViewInfo;
@ -789,38 +783,13 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_CREATE_MTMV_JOB: {
data = MTMVJob.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_MTMV_JOB: {
data = DropMTMVJob.read(in);
isRead = true;
break;
}
case OperationType.OP_CHANGE_MTMV_JOB: {
data = ChangeMTMVJob.read(in);
isRead = true;
break;
}
case OperationType.OP_CREATE_MTMV_TASK: {
data = MTMVTask.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_MTMV_TASK: {
data = DropMTMVTask.read(in);
isRead = true;
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
Text.readString(in);
isRead = true;
break;
}
case OperationType.OP_CREATE_MTMV_JOB:
case OperationType.OP_CHANGE_MTMV_JOB:
case OperationType.OP_DROP_MTMV_JOB:
case OperationType.OP_CREATE_MTMV_TASK:
case OperationType.OP_CHANGE_MTMV_TASK:
case OperationType.OP_DROP_MTMV_TASK:
case OperationType.OP_ALTER_MTMV_STMT: {
data = AlterMultiMaterializedView.read(in);
isRead = true;
break;
}

View File

@ -1,117 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
import org.apache.doris.analysis.MVRefreshInfo.RefreshTrigger;
import org.apache.doris.analysis.MVRefreshIntervalTriggerInfo;
import org.apache.doris.analysis.MVRefreshTriggerInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class MTMVJobFactory {
private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
public static boolean isGenerateJob(MaterializedView materializedView) {
boolean completeRefresh = materializedView.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE;
BuildMode buildMode = materializedView.getBuildMode();
MVRefreshTriggerInfo triggerInfo = materializedView.getRefreshInfo().getTriggerInfo();
//can not generate a job when creating a temp materialized view.
if (materializedView.getName().startsWith(FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX)) {
return false;
}
if (buildMode == BuildMode.IMMEDIATE) {
return completeRefresh;
} else {
return completeRefresh && triggerInfo != null && triggerInfo.getRefreshTrigger() == RefreshTrigger.INTERVAL;
}
}
public static List<MTMVJob> buildJob(MaterializedView materializedView, String dbName) {
List<MTMVJob> jobs = new ArrayList<>();
MVRefreshTriggerInfo triggerInfo = materializedView.getRefreshInfo().getTriggerInfo();
boolean isRunPeriodJobImmediate = false;
if (triggerInfo != null && triggerInfo.getRefreshTrigger() == RefreshTrigger.INTERVAL) {
MTMVJob job = genPeriodicalJob(materializedView, dbName);
isRunPeriodJobImmediate = MTMVUtils.getDelaySeconds(job) == 0;
jobs.add(job);
}
// if the PeriodicalJob run immediate since an early start time, don't run the immediate build.
if (!isRunPeriodJobImmediate && materializedView.getBuildMode() == BuildMode.IMMEDIATE) {
jobs.add(genOnceJob(materializedView, dbName));
}
return jobs;
}
private static MTMVJob genPeriodicalJob(MaterializedView materializedView, String dbName) {
String uid = UUID.randomUUID().toString();
MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
job.setId(Env.getCurrentEnv().getNextId());
job.setTriggerMode(TriggerMode.PERIODICAL);
job.setSchedule(genJobSchedule(materializedView));
job.setDBName(dbName);
job.setMVName(materializedView.getName());
job.setQuery(materializedView.getQuery());
job.setCreateTime(MTMVUtils.getNowTimeStamp());
return job;
}
public static MTMVJob genOnceJob(MaterializedView materializedView, String dbName) {
String uid = UUID.randomUUID().toString();
MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
job.setId(Env.getCurrentEnv().getNextId());
job.setTriggerMode(TriggerMode.ONCE);
job.setDBName(dbName);
job.setMVName(materializedView.getName());
job.setQuery(materializedView.getQuery());
long nowTimeStamp = MTMVUtils.getNowTimeStamp();
job.setCreateTime(nowTimeStamp);
job.setExpireTime(nowTimeStamp + Config.scheduler_mtmv_job_expired);
return job;
}
private static JobSchedule genJobSchedule(MaterializedView materializedView) {
MVRefreshIntervalTriggerInfo info = materializedView.getRefreshInfo().getTriggerInfo().getIntervalTrigger();
long startTime;
try {
LocalDateTime dateTime = LocalDateTime.parse(info.getStartTime(), TimeUtils.DATETIME_FORMAT);
startTime = dateTime.toEpochSecond(TimeUtils.TIME_ZONE.getRules().getOffset(dateTime));
} catch (DateTimeParseException e) {
throw new RuntimeException(e);
}
return new JobSchedule(startTime, info.getInterval(), MTMVUtils.getTimeUint(info.getTimeUnit()));
}
}

View File

@ -1,422 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.MTMVCheckpointData;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class MTMVJobManager {
private static final Logger LOG = LogManager.getLogger(MTMVJobManager.class);
// make sure that metrics were registered only once.
private static volatile boolean metricsRegistered = false;
private final Map<Long, MTMVJob> idToJobMap;
private final Map<String, MTMVJob> nameToJobMap;
private final MTMVTaskManager taskManager;
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-period-scheduler"));
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
private final ReentrantReadWriteLock rwLock;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
public MTMVJobManager() {
idToJobMap = Maps.newConcurrentMap();
nameToJobMap = Maps.newConcurrentMap();
rwLock = new ReentrantReadWriteLock(true);
taskManager = new MTMVTaskManager();
}
public void start() {
if (isStarted.compareAndSet(false, true)) {
// check the scheduler before using it
// since it may be shutdown when master change to follower without process shutdown.
if (periodScheduler.isShutdown()) {
periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-period-scheduler"));
}
registerJobs();
if (cleanerScheduler.isShutdown()) {
cleanerScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
}
cleanerScheduler.scheduleAtFixedRate(() -> {
if (!Env.getCurrentEnv().isMaster()) {
LOG.warn("only master can run MTMVJob");
return;
}
removeExpiredJobs();
taskManager.removeExpiredTasks();
}, 0, 1, TimeUnit.MINUTES);
taskManager.startTaskScheduler();
initMetrics();
}
}
private void initMetrics() {
if (metricsRegistered) {
return;
}
metricsRegistered = true;
// total jobs
GaugeMetric<Integer> totalJob = new GaugeMetric<Integer>("mtmv_job",
Metric.MetricUnit.NOUNIT, "Total job number of mtmv.") {
@Override
public Integer getValue() {
return nameToJobMap.size();
}
};
totalJob.addLabel(new MetricLabel("type", "TOTAL-JOB"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalJob);
// active jobs
GaugeMetric<Integer> activeJob = new GaugeMetric<Integer>("mtmv_job",
Metric.MetricUnit.NOUNIT, "Active job number of mtmv.") {
@Override
public Integer getValue() {
int result = 0;
for (MTMVJob job : getAllJobsWithLock()) {
if (job.getState() == JobState.ACTIVE) {
result++;
}
}
return result;
}
};
activeJob.addLabel(new MetricLabel("type", "ACTIVE-JOB"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(activeJob);
// total tasks
GaugeMetric<Integer> totalTask = new GaugeMetric<Integer>("mtmv_task",
Metric.MetricUnit.NOUNIT, "Total task number of mtmv.") {
@Override
public Integer getValue() {
return getTaskManager().getHistoryTasks().size();
}
};
totalTask.addLabel(new MetricLabel("type", "TOTAL-TASK"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalTask);
// running tasks
GaugeMetric<Integer> runningTask = new GaugeMetric<Integer>("mtmv_task",
Metric.MetricUnit.NOUNIT, "Running task number of mtmv.") {
@Override
public Integer getValue() {
return getTaskManager().getRunningTaskMap().size();
}
};
runningTask.addLabel(new MetricLabel("type", "RUNNING-TASK"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(runningTask);
// pending tasks
GaugeMetric<Integer> pendingTask = new GaugeMetric<Integer>("mtmv_task",
Metric.MetricUnit.NOUNIT, "Pending task number of mtmv.") {
@Override
public Integer getValue() {
return getTaskManager().getPendingTaskMap().size();
}
};
pendingTask.addLabel(new MetricLabel("type", "PENDING-TASK"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(pendingTask);
// failed tasks
GaugeMetric<Integer> failedTask = new GaugeMetric<Integer>("mtmv_task",
Metric.MetricUnit.NOUNIT, "Failed task number of mtmv.") {
@Override
public Integer getValue() {
return getTaskManager().getFailedTaskCount();
}
};
failedTask.addLabel(new MetricLabel("type", "FAILED-TASK"));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(failedTask);
}
public void stop() {
if (isStarted.compareAndSet(true, false)) {
periodScheduler.shutdown();
cleanerScheduler.shutdown();
taskManager.stopTaskScheduler();
}
}
private void registerJobs() {
for (MTMVJob job : getAllJobsWithLock()) {
job.start();
}
}
public void createJob(MTMVJob job, boolean isReplay) throws DdlException {
createJobWithLock(job, isReplay);
if (!isReplay) {
job.start();
}
}
private void createJobWithLock(MTMVJob job, boolean isReplay) throws DdlException {
writeLock();
try {
if (nameToJobMap.containsKey(job.getName())) {
throw new DdlException("Job [" + job.getName() + "] already exists");
}
nameToJobMap.put(job.getName(), job);
idToJobMap.put(job.getId(), job);
if (!isReplay) {
// log job before submit any task.
Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
}
} finally {
writeUnlock();
}
}
public void refreshMTMV(String dbName, String mvName)
throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
MaterializedView mv = (MaterializedView) db.getTableOrMetaException(mvName, TableType.MATERIALIZED_VIEW);
MTMVJob mtmvJob = MTMVJobFactory.genOnceJob(mv, dbName);
createJob(mtmvJob, false);
}
public void dropJobByName(String dbName, String mvName, boolean isReplay) {
List<Long> jobIds = Lists.newArrayList();
for (String jobName : nameToJobMap.keySet()) {
MTMVJob job = nameToJobMap.get(jobName);
if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) {
jobIds.add(job.getId());
}
}
dropJobs(jobIds, isReplay);
}
public void dropJobs(List<Long> jobIds, boolean isReplay) {
if (jobIds.isEmpty()) {
return;
}
for (long jobId : jobIds) {
dropJob(jobId, isReplay);
}
LOG.info("drop jobs:{}", jobIds);
}
private void dropJob(long jobId, boolean isReplay) {
MTMVJob job = dropJobWithLock(jobId, isReplay);
if (!isReplay && job != null) {
job.stop();
}
}
private MTMVJob dropJobWithLock(long jobId, boolean isReplay) {
writeLock();
try {
MTMVJob job = idToJobMap.get(jobId);
if (job == null) {
LOG.warn("drop jobId {} failed because job is null", jobId);
return null;
}
idToJobMap.remove(job.getId());
nameToJobMap.remove(job.getName());
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropMTMVJob(Collections.singletonList(jobId));
}
return job;
} finally {
writeUnlock();
}
}
public List<MTMVJob> showAllJobs() {
return showJobs(null);
}
public List<MTMVJob> showJobs(String dbName) {
List<MTMVJob> jobList = Lists.newArrayList();
if (Strings.isNullOrEmpty(dbName)) {
jobList.addAll(getAllJobsWithLock());
} else {
jobList.addAll(getAllJobsWithLock().stream().filter(u -> u.getDBName().equals(dbName))
.collect(Collectors.toList()));
}
return jobList.stream().sorted().collect(Collectors.toList());
}
public List<MTMVJob> showJobs(String dbName, String mvName) {
return showJobs(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
}
public void replayCreateJob(MTMVJob job) {
if (job.getExpireTime() > 0 && MTMVUtils.getNowTimeStamp() > job.getExpireTime()) {
return;
}
try {
createJob(job, true);
} catch (DdlException e) {
LOG.warn("failed to replay create job [{}]", job.getName(), e);
}
}
public void replayDropJobs(List<Long> jobIds) {
dropJobs(jobIds, true);
}
public void replayUpdateJob(ChangeMTMVJob changeJob) {
MTMVJob mtmvJob = idToJobMap.get(changeJob.getJobId());
if (mtmvJob != null) {
mtmvJob.updateJob(changeJob, true);
}
}
public void replayCreateJobTask(MTMVTask task) {
taskManager.replayCreateJobTask(task);
}
public void replayDropJobTasks(List<String> taskIds) {
taskManager.dropHistoryTasks(taskIds, true);
}
private void removeExpiredJobs() {
long currentTimeSeconds = MTMVUtils.getNowTimeStamp();
List<Long> jobIdsToDelete = Lists.newArrayList();
List<MTMVJob> jobs = getAllJobsWithLock();
for (MTMVJob job : jobs) {
// active job should not clean
if (job.getState() != MTMVUtils.JobState.COMPLETE) {
continue;
}
long expireTime = job.getExpireTime();
if (expireTime > 0 && currentTimeSeconds > expireTime) {
jobIdsToDelete.add(job.getId());
}
}
dropJobs(jobIdsToDelete, false);
}
public MTMVJob getJob(String jobName) {
return nameToJobMap.get(jobName);
}
private List<MTMVJob> getAllJobsWithLock() {
readLock();
try {
return Lists.newArrayList(nameToJobMap.values());
} finally {
readUnlock();
}
}
public MTMVTaskManager getTaskManager() {
return taskManager;
}
public ScheduledExecutorService getPeriodScheduler() {
return periodScheduler;
}
private void readLock() {
this.rwLock.readLock().lock();
}
private void readUnlock() {
this.rwLock.readLock().unlock();
}
private void writeLock() {
this.rwLock.writeLock().lock();
}
private void writeUnlock() {
this.rwLock.writeLock().unlock();
}
public long write(DataOutputStream dos, long checksum) throws IOException {
MTMVCheckpointData data = new MTMVCheckpointData();
data.jobs = new ArrayList<>(nameToJobMap.values());
data.tasks = Lists.newArrayList(taskManager.getHistoryTasks());
String s = GsonUtils.GSON.toJson(data);
Text.writeString(dos, s);
return checksum;
}
public static MTMVJobManager read(DataInputStream dis, long checksum) throws IOException {
MTMVJobManager mtmvJobManager = new MTMVJobManager();
String s = Text.readString(dis);
MTMVCheckpointData data = GsonUtils.GSON.fromJson(s, MTMVCheckpointData.class);
if (data != null) {
if (data.jobs != null) {
for (MTMVJob job : data.jobs) {
mtmvJobManager.replayCreateJob(job);
}
}
if (data.tasks != null) {
for (MTMVTask runStatus : data.tasks) {
mtmvJobManager.replayCreateJobTask(runStatus);
}
}
}
return mtmvJobManager;
}
}

View File

@ -1,72 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.qe.ConnectContext;
import java.util.Map;
public class MTMVTaskContext {
ConnectContext ctx;
String query;
Map<String, String> properties;
MTMVTask task;
MTMVJob job;
public ConnectContext getCtx() {
return ctx;
}
public void setTask(MTMVTask task) {
this.task = task;
}
public MTMVTask getTask() {
return this.task;
}
public void setJob(MTMVJob job) {
this.job = job;
}
public MTMVJob getJob() {
return this.job;
}
public void setCtx(ConnectContext ctx) {
this.ctx = ctx;
}
public String getQuery() {
return query;
}
public void setQuery(String query) {
this.query = query;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
}

View File

@ -1,36 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.mtmv.MTMVUtils.TaskPriority;
public class MTMVTaskExecuteParams {
private int priority = TaskPriority.LOW.value();
public MTMVTaskExecuteParams() {
}
public MTMVTaskExecuteParams(int priority) {
this.priority = priority;
}
public int getPriority() {
return priority;
}
}

View File

@ -1,181 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
private static final Logger LOG = LogManager.getLogger(MTMVTaskExecutor.class);
private long jobId;
private Map<String, String> properties;
private Future<?> future;
private MTMVJob job;
private ConnectContext ctx;
private MTMVTaskProcessor processor;
private MTMVTask task;
public long getJobId() {
return jobId;
}
public void setJobId(long jobId) {
this.jobId = jobId;
}
public MTMVJob getJob() {
return job;
}
public void setJob(MTMVJob job) {
this.job = job;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public Future<?> getFuture() {
return future;
}
public void setFuture(Future<?> future) {
this.future = future;
}
public MTMVTaskProcessor getProcessor() {
return processor;
}
public void setProcessor(MTMVTaskProcessor processor) {
this.processor = processor;
}
public boolean executeTask() throws Exception {
MTMVTaskContext taskContext = new MTMVTaskContext();
taskContext.setQuery(task.getQuery());
ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDBName()));
ctx.setDatabase(job.getDBName());
ctx.setQualifiedUser(task.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
ctx.getState().reset();
UUID taskId = UUID.fromString(task.getTaskId());
TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
ctx.setQueryId(queryId);
taskContext.setCtx(ctx);
taskContext.setTask(task);
taskContext.setJob(job);
Map<String, String> properties = Maps.newHashMap();
taskContext.setProperties(properties);
return processor.process(taskContext);
}
public ConnectContext getCtx() {
return ctx;
}
public MTMVTask getTask() {
return task;
}
public void setTask(MTMVTask task) {
this.task = task;
}
public MTMVTask initTask(String taskId, Long createTime) {
MTMVTask task = new MTMVTask();
task.setTaskId(taskId);
task.setJobName(job.getName());
if (createTime == null) {
task.setCreateTime(MTMVUtils.getNowTimeStamp());
} else {
task.setCreateTime(createTime);
}
task.setMVName(job.getMVName());
task.setUser(job.getUser());
task.setDBName(job.getDBName());
task.setQuery(job.getQuery());
task.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_task_expired);
task.setRetryTimes(job.getRetryPolicy().getTimes());
this.task = task;
return task;
}
public void stop() {
if (ctx != null) {
ctx.kill(false);
}
}
@Override
public int compareTo(@NotNull MTMVTaskExecutor task) {
if (this.getTask().getPriority() != task.getTask().getPriority()) {
return task.getTask().getPriority() - this.getTask().getPriority();
} else {
return this.getTask().getCreateTime() > task.getTask().getCreateTime() ? 1 : -1;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MTMVTaskExecutor task = (MTMVTaskExecutor) o;
return this.task.getQuery().equals(task.getTask().getQuery());
}
@Override
public int hashCode() {
return Objects.hash(task);
}
}

View File

@ -1,82 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MTMVTaskExecutorPool {
private static final Logger LOG = LogManager.getLogger(MTMVTaskExecutorPool.class);
private static final long RETRY_INTERVAL = TimeUnit.SECONDS.toMillis(30);
private final ExecutorService taskPool = Executors.newCachedThreadPool();
public void executeTask(MTMVTaskExecutor taskExecutor) {
if (taskExecutor == null) {
return;
}
MTMVTask task = taskExecutor.getTask();
if (task == null) {
return;
}
if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) {
LOG.warn("Task {} is in final status {} ", task.getTaskId(), task.getState());
return;
}
Future<?> future = taskPool.submit(() -> {
task.setState(TaskState.RUNNING);
int retryTimes = task.getRetryTimes();
boolean isSuccess = false;
do {
try {
isSuccess = taskExecutor.executeTask();
if (isSuccess) {
task.setState(TaskState.SUCCESS);
break;
}
} catch (Throwable t) {
LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
}
retryTimes--;
if (retryTimes > 0) {
try {
Thread.sleep(RETRY_INTERVAL);
} catch (InterruptedException e) {
LOG.warn("Failed to sleep.", e);
break;
}
}
} while (!isSuccess && retryTimes > 0);
if (!isSuccess) {
task.setState(TaskState.FAILURE);
task.setErrorCode(-1);
}
task.setFinishTime(MTMVUtils.getNowTimeStamp());
});
taskExecutor.setFuture(future);
}
}

View File

@ -1,350 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class MTMVTaskManager {
private static final Logger LOG = LogManager.getLogger(MTMVTaskManager.class);
// jobId -> pending tasks, one job can dispatch many tasks
private final Map<Long, PriorityBlockingQueue<MTMVTaskExecutor>> pendingTaskMap = Maps.newConcurrentMap();
// jobId -> running tasks, only one task will be running for one job.
private final Map<Long, MTMVTaskExecutor> runningTaskMap = Maps.newConcurrentMap();
private final MTMVTaskExecutorPool taskExecutorPool = new MTMVTaskExecutorPool();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
// keep track of all the completed tasks
private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque();
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mtmv-task-scheduler"));
private final AtomicInteger failedTaskCount = new AtomicInteger(0);
public void startTaskScheduler() {
if (taskScheduler.isShutdown()) {
taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler"));
}
taskScheduler.scheduleAtFixedRate(() -> {
checkRunningTask();
scheduledPendingTask();
}, 0, 1, TimeUnit.SECONDS);
}
public void stopTaskScheduler() {
taskScheduler.shutdown();
}
private void checkRunningTask() {
writeLock();
try {
Iterator<Long> runningIterator = runningTaskMap.keySet().iterator();
while (runningIterator.hasNext()) {
Long jobId = runningIterator.next();
MTMVTaskExecutor taskExecutor = runningTaskMap.get(jobId);
Future<?> future = taskExecutor.getFuture();
if (future.isDone()) {
runningIterator.remove();
addHistory(taskExecutor.getTask());
MTMVUtils.TaskState finalState = taskExecutor.getTask().getState();
if (finalState == TaskState.FAILURE) {
failedTaskCount.incrementAndGet();
}
//task save final state only
Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
taskExecutor.getJob().taskFinished();
}
}
} finally {
writeUnlock();
}
}
private void scheduledPendingTask() {
writeLock();
try {
int currentRunning = runningTaskMap.size();
Iterator<Long> pendingIterator = pendingTaskMap.keySet().iterator();
while (pendingIterator.hasNext()) {
Long jobId = pendingIterator.next();
MTMVTaskExecutor runningTaskExecutor = runningTaskMap.get(jobId);
if (runningTaskExecutor == null) {
Queue<MTMVTaskExecutor> taskQueue = pendingTaskMap.get(jobId);
if (taskQueue.size() == 0) {
pendingIterator.remove();
} else {
if (currentRunning >= Config.max_running_mtmv_scheduler_task_num) {
break;
}
MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
taskExecutorPool.executeTask(pendingTaskExecutor);
runningTaskMap.put(jobId, pendingTaskExecutor);
currentRunning++;
}
}
}
} finally {
writeUnlock();
}
}
public MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job) {
return submitJobTask(job, new MTMVTaskExecuteParams());
}
private MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job, MTMVTaskExecuteParams param) {
return submitTask(MTMVUtils.buildTask(job), param);
}
private MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
// duplicate submit
if (taskExecutor.getTask() != null) {
return MTMVUtils.TaskSubmitStatus.FAILED;
}
int validPendingCount = 0;
for (Long jobId : pendingTaskMap.keySet()) {
if (!pendingTaskMap.get(jobId).isEmpty()) {
validPendingCount++;
}
}
if (validPendingCount >= Config.max_pending_mtmv_scheduler_task_num) {
LOG.warn("pending task exceeds pending_scheduler_task_size:{}, reject the submit.",
Config.max_pending_mtmv_scheduler_task_num);
return MTMVUtils.TaskSubmitStatus.REJECTED;
}
String taskId = UUID.randomUUID().toString();
MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp());
task.setPriority(params.getPriority());
LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName());
arrangeToPendingTask(taskExecutor);
return MTMVUtils.TaskSubmitStatus.SUBMITTED;
}
private void arrangeToPendingTask(MTMVTaskExecutor task) {
writeLock();
try {
long jobId = task.getJobId();
PriorityBlockingQueue<MTMVTaskExecutor> tasks =
pendingTaskMap.computeIfAbsent(jobId, u -> Queues.newPriorityBlockingQueue());
tasks.offer(task);
} finally {
writeUnlock();
}
}
public void dealJobRemoved(MTMVJob job) {
removePendingTask(job.getId());
removeRunningTask(job.getId());
if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
clearHistoryTasksByJobName(job.getName(), false);
}
}
private void removePendingTask(Long jobId) {
pendingTaskMap.remove(jobId);
}
private void removeRunningTask(Long jobId) {
MTMVTaskExecutor task = runningTaskMap.remove(jobId);
if (task != null) {
task.stop();
}
}
public int getFailedTaskCount() {
return failedTaskCount.get();
}
public Map<Long, PriorityBlockingQueue<MTMVTaskExecutor>> getPendingTaskMap() {
return pendingTaskMap;
}
public Map<Long, MTMVTaskExecutor> getRunningTaskMap() {
return runningTaskMap;
}
private void addHistory(MTMVTask task) {
historyTasks.addFirst(task);
}
public Deque<MTMVTask> getHistoryTasks() {
return historyTasks;
}
public List<MTMVTask> getHistoryTasksByJobName(String jobName) {
return getHistoryTasks().stream().filter(u -> u.getJobName().equals(jobName))
.collect(Collectors.toList());
}
public List<MTMVTask> showAllTasks() {
return showTasksWithLock(null);
}
public List<MTMVTask> showTasksWithLock(String dbName) {
List<MTMVTask> taskList = Lists.newArrayList();
readLock();
try {
if (Strings.isNullOrEmpty(dbName)) {
for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
taskList.addAll(pTaskQueue.stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
}
taskList.addAll(
getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
.collect(Collectors.toList()));
taskList.addAll(getHistoryTasks());
} else {
for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
taskList.addAll(
pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
.collect(Collectors.toList()));
}
taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
.filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
taskList.addAll(
getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName))
.collect(Collectors.toList()));
}
} finally {
readUnlock();
}
return taskList.stream().sorted().collect(Collectors.toList());
}
public List<MTMVTask> showTasks(String dbName, String mvName) {
return showTasksWithLock(dbName).stream().filter(u -> u.getMVName().equals(mvName))
.collect(Collectors.toList());
}
public MTMVTask getTask(String taskId) throws AnalysisException {
List<MTMVTask> tasks =
showAllTasks().stream().filter(u -> u.getTaskId().equals(taskId)).collect(Collectors.toList());
if (tasks.size() == 0) {
throw new AnalysisException("Can't find the task id in the task list.");
} else if (tasks.size() > 1) {
throw new AnalysisException("Find more than one task id in the task list.");
} else {
return tasks.get(0);
}
}
public void replayCreateJobTask(MTMVTask task) {
addHistory(task);
}
private void clearHistoryTasksByJobName(String jobName, boolean isReplay) {
List<String> clearTasks = Lists.newArrayList();
Deque<MTMVTask> taskHistory = getHistoryTasks();
for (MTMVTask task : taskHistory) {
if (task.getJobName().equals(jobName)) {
clearTasks.add(task.getTaskId());
}
}
dropHistoryTasks(clearTasks, isReplay);
}
public void removeExpiredTasks() {
long currentTime = MTMVUtils.getNowTimeStamp();
List<String> historyToDelete = Lists.newArrayList();
Deque<MTMVTask> taskHistory = getHistoryTasks();
for (MTMVTask task : taskHistory) {
long expireTime = task.getExpireTime();
if (currentTime > expireTime) {
historyToDelete.add(task.getTaskId());
}
}
dropHistoryTasks(historyToDelete, false);
}
public void dropHistoryTasks(List<String> taskIds, boolean isReplay) {
if (taskIds.isEmpty()) {
return;
}
writeLock();
try {
Set<String> taskSet = new HashSet<>(taskIds);
// Try to remove history tasks.
getHistoryTasks().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId()));
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
}
} finally {
writeUnlock();
}
LOG.info("drop task history:{}", taskIds);
}
private void readLock() {
this.rwLock.readLock().lock();
}
private void readUnlock() {
this.rwLock.readLock().unlock();
}
private void writeLock() {
this.rwLock.writeLock().lock();
}
private void writeUnlock() {
this.rwLock.writeLock().unlock();
}
}

View File

@ -1,158 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class MTMVTaskProcessor {
private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
boolean process(MTMVTaskContext context) throws Exception {
String taskId = context.getTask().getTaskId();
long jobId = context.getJob().getId();
LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId, jobId);
String mvName = context.getTask().getMVName();
String temporaryMVName = getTemporaryMVName(mvName);
Database db = context.getCtx().getEnv().getInternalCatalog()
.getDbOrMetaException(context.getTask().getDBName());
MaterializedView mv = (MaterializedView) db.getTableOrAnalysisException(mvName);
if (!mv.tryLockMVTask()) {
LOG.warn("Failed to run the MTMV task, taskId={}, jobId={}, msg={}.", taskId, jobId,
"Failed to get the lock");
context.getTask().setMessage("Failed to get the lock.");
return false;
}
try {
// Check whether the temporary materialized view exists, we should drop the obsolete materialized view first
// because it was created by previous tasks which failed to complete their work.
dropMaterializedView(context, temporaryMVName);
// Step 1: create the temporary materialized view.
String createStatement = generateCreateStatement(mv.clone(temporaryMVName));
if (!executeSQL(context, createStatement)) {
throw new RuntimeException(
"Failed to create the temporary materialized view, sql=" + createStatement + ", cause="
+ context.getCtx().getState().getErrorMessage() + ".");
}
// Step 2: insert data to the temporary materialized view.
String insertSelectStatement = generateInsertSelectStmt(context, temporaryMVName);
if (!executeSQL(context, insertSelectStatement)) {
throw new RuntimeException(
"Failed to insert data to the temporary materialized view, sql=" + insertSelectStatement
+ ", cause=" + context.getCtx().getState().getErrorMessage() + ".");
}
String insertInfoMessage = context.getCtx().getState().getInfoMessage();
// Step 3: swap the temporary materialized view with the original materialized view.
String swapStatement = generateSwapStatement(mvName, temporaryMVName);
if (!executeSQL(context, swapStatement)) {
throw new RuntimeException(
"Failed to swap the temporary materialized view with the original materialized view, sql="
+ swapStatement + ", cause=" + context.getCtx().getState().getErrorMessage() + ".");
}
context.getTask().setMessage(insertInfoMessage);
LOG.info("Run MTMV task successfully, taskId={}, jobId={}.", taskId, jobId);
return true;
} catch (Throwable e) {
context.getTask().setMessage(e.getMessage());
throw e;
} finally {
mv.unLockMVTask();
dropMaterializedView(context, temporaryMVName);
}
}
private String getTemporaryMVName(String mvName) {
return FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + mvName;
}
private void dropMaterializedView(MTMVTaskContext context, String mvName) {
String dropStatement = generateDropStatement(mvName);
if (!executeSQL(context, dropStatement)) {
throw new RuntimeException(
"Failed to drop the temporary materialized view, sql=" + dropStatement + ".");
}
}
private String generateDropStatement(String mvName) {
return "DROP MATERIALIZED VIEW IF EXISTS " + mvName;
}
private boolean executeSQL(MTMVTaskContext context, String sql) {
ConnectContext ctx = context.getCtx();
ctx.setThreadLocalInfo();
ctx.getState().reset();
try {
ctx.getSessionVariable().disableNereidsPlannerOnce();
StmtExecutor executor = new StmtExecutor(ctx, sql);
ctx.setExecutor(executor);
executor.execute();
} catch (Throwable e) {
QueryState queryState = new QueryState();
queryState.setError(ErrorCode.ERR_INTERNAL_ERROR, e.getMessage());
ctx.setState(queryState);
} finally {
ConnectContext.remove();
}
if (ctx.getState().getStateType() == MysqlStateType.OK) {
LOG.info("Execute SQL successfully, taskId={}, sql={}.", context.getTask().getTaskId(), sql);
} else {
LOG.warn("Failed to execute SQL, taskId={}, sql={}, errorCode={}, message={}.",
context.getTask().getTaskId(),
sql, ctx.getState().getErrorCode(), ctx.getState().getErrorMessage());
}
return ctx.getState().getStateType() == MysqlStateType.OK;
}
private String generateCreateStatement(MaterializedView mv) {
List<String> createStatement = Lists.newArrayList();
Env.getDdlStmt(mv, createStatement, null, null, false, true /* hide password */, -1L);
return createStatement.stream().findFirst().orElse("");
}
private String generateInsertSelectStmt(MTMVTaskContext context, String temporaryMVName) {
return "INSERT INTO " + temporaryMVName + " " + context.getQuery();
}
// ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false');
private String generateSwapStatement(String mvName, String temporaryMVName) {
return "ALTER TABLE " + mvName + " REPLACE WITH TABLE " + temporaryMVName + " PROPERTIES('swap' = 'false')";
}
}

View File

@ -1,137 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.mtmv.metadata.MTMVJob;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
public class MTMVUtils {
public enum TriggerMode {
MANUAL, ONCE, PERIODICAL, ON_COMMIT
}
/**
* All the job init status: UNKNOWN
* create job: UNKNOWN -> ACTIVE
* after job scheduler:
* - ONCE Job: ACTIVE -> COMPLETE (no matter task success or fail)
* - PERIODICAL Job: ACTIVE -> PAUSE (if the task failed too many times)
* - PERIODICAL Job: ACTIVE -> COMPLETE (when the job is expired.)
* recover job: PAUSE -> ACTIVE (specific command)
*/
public enum JobState {
UNKNOWN, ACTIVE, PAUSE, COMPLETE
}
public enum TaskRetryPolicy {
NEVER(0), // no any retry
TIMES(3), // fix 3 times retry
ALWAYS(100); // treat 100 as big number since most case will cause a lot of resource
private final int times;
TaskRetryPolicy(int times) {
this.times = times;
}
public int getTimes() {
return times;
}
}
public enum TaskState {
PENDING, RUNNING, FAILURE, SUCCESS,
}
public enum TaskSubmitStatus {
SUBMITTED, REJECTED, FAILED
}
public enum TaskPriority {
LOW(0), NORMAL(50), HIGH(100);
private final int value;
TaskPriority(int value) {
this.value = value;
}
public int value() {
return value;
}
}
public static long getDelaySeconds(MTMVJob job) {
return getDelaySeconds(job, LocalDateTime.now());
}
public static long getDelaySeconds(MTMVJob job, LocalDateTime now) {
long lastModifyTime = job.getLastModifyTime();
long nextTime = 0;
// if set lastModifyTime, use lastModifyTime otherwise use the start time.
if (lastModifyTime > 0) {
// check null of schedule outside.
nextTime = lastModifyTime + job.getSchedule().getSecondPeriod();
} else {
nextTime = job.getSchedule().getStartTime();
}
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochSecond(nextTime), ZoneId.systemDefault());
Duration duration = Duration.between(now, time);
long delaySeconds = duration.getSeconds();
// start immediately if startTime < now
return delaySeconds < 0 ? 0 : delaySeconds;
}
public static MTMVTaskExecutor buildTask(MTMVJob job) {
MTMVTaskExecutor taskExecutor = new MTMVTaskExecutor();
taskExecutor.setJobId(job.getId());
taskExecutor.setProperties(job.getProperties());
taskExecutor.setJob(job);
taskExecutor.setProcessor(new MTMVTaskProcessor());
return taskExecutor;
}
public static TimeUnit getTimeUint(String strTimeUnit) {
switch (strTimeUnit.toUpperCase()) {
case "SECOND":
return TimeUnit.SECONDS;
case "MINUTE":
return TimeUnit.MINUTES;
case "HOUR":
return TimeUnit.HOURS;
default:
return TimeUnit.DAYS;
}
}
// In MTMV package, all the timestamp unit is second.
public static long getNowTimeStamp() {
return System.currentTimeMillis() / 1000;
}
public static String getTimeString(long timestamp) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
return time.toString();
}
}

View File

@ -1,99 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mtmv.MTMVUtils;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ChangeMTMVJob implements Writable {
@SerializedName("jobId")
private long jobId;
@SerializedName("lastModifyTime")
private long lastModifyTime;
@SerializedName("fromStatus")
JobState fromStatus;
@SerializedName("toStatus")
JobState toStatus;
@SerializedName("errorCode")
private int errorCode;
@SerializedName("errorMessage")
private String errorMessage;
public ChangeMTMVJob(long jobId, JobState toStatus) {
this.jobId = jobId;
this.toStatus = toStatus;
this.lastModifyTime = MTMVUtils.getNowTimeStamp();
}
public long getJobId() {
return jobId;
}
public void setJobId(long jobId) {
this.jobId = jobId;
}
public long getLastModifyTime() {
return lastModifyTime;
}
public void setLastModifyTime(long lastModifyTime) {
this.lastModifyTime = lastModifyTime;
}
public JobState getFromStatus() {
return fromStatus;
}
public void setFromStatus(JobState fromStatus) {
this.fromStatus = fromStatus;
}
public JobState getToStatus() {
return toStatus;
}
public void setToStatus(JobState toStatus) {
this.toStatus = toStatus;
}
public static ChangeMTMVJob read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ChangeMTMVJob.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -1,52 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class DropMTMVJob implements Writable {
@SerializedName("jobIds")
List<Long> jobIds;
public DropMTMVJob(List<Long> jobIds) {
this.jobIds = jobIds;
}
public List<Long> getJobIds() {
return jobIds;
}
public static DropMTMVJob read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), DropMTMVJob.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -1,53 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class DropMTMVTask implements Writable {
@SerializedName("taskIds")
List<String> taskIds;
public DropMTMVTask(List<String> taskIdList) {
this.taskIds = taskIdList;
}
public List<String> getTaskIds() {
return taskIds;
}
public static DropMTMVTask read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), DropMTMVTask.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -1,30 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import com.google.gson.annotations.SerializedName;
import java.util.List;
public class MTMVCheckpointData {
@SerializedName("jobs")
public List<MTMVJob> jobs;
@SerializedName("tasks")
public List<MTMVTask> tasks;
}

View File

@ -1,364 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mtmv.MTMVUtils;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class MTMVJob implements Writable, Comparable {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
@SerializedName("id")
private long id;
@SerializedName("name")
private String name;
// set default to MANUAL is for compatibility
@SerializedName("triggerMode")
private MTMVUtils.TriggerMode triggerMode = MTMVUtils.TriggerMode.MANUAL;
@SerializedName("state")
private MTMVUtils.JobState state = JobState.ACTIVE;
@SerializedName("schedule")
private JobSchedule schedule;
@SerializedName("createTime")
private long createTime;
@SerializedName("dbName")
private String dbName;
@SerializedName("mvName")
private String mvName;
@SerializedName("query")
private String query;
@SerializedName("properties")
private Map<String, String> properties;
@SerializedName("expireTime")
private long expireTime = -1;
// set default to ROOT is for compatibility
@SerializedName("user")
private String user = "root";
@SerializedName("retryPolicy")
private TaskRetryPolicy retryPolicy = TaskRetryPolicy.NEVER;
@SerializedName("lastModifyTime")
private long lastModifyTime;
private ScheduledFuture<?> future;
public MTMVJob(String name) {
this.name = name;
this.createTime = MTMVUtils.getNowTimeStamp();
}
public static MTMVJob read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, MTMVJob.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public TriggerMode getTriggerMode() {
return triggerMode;
}
public void setTriggerMode(TriggerMode triggerMode) {
this.triggerMode = triggerMode;
}
public JobState getState() {
return state;
}
public void setState(JobState state) {
this.state = state;
}
public JobSchedule getSchedule() {
return schedule;
}
public void setSchedule(JobSchedule schedule) {
this.schedule = schedule;
}
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
public String getDBName() {
return dbName;
}
public void setDBName(String dbName) {
this.dbName = dbName;
}
public String getMVName() {
return mvName;
}
public void setMVName(String mvName) {
this.mvName = mvName;
}
public String getQuery() {
return query == null ? "" : query;
}
public void setQuery(String query) {
this.query = query;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public long getExpireTime() {
return expireTime;
}
public void setExpireTime(long expireTime) {
this.expireTime = expireTime;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public TaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}
public void setRetryPolicy(TaskRetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public long getLastModifyTime() {
return lastModifyTime;
}
public void setLastModifyTime(long lastModifyTime) {
this.lastModifyTime = lastModifyTime;
}
public static class JobSchedule {
@SerializedName("startTime")
private long startTime; // second
@SerializedName("period")
private long period;
@SerializedName("timeUnit")
private TimeUnit timeUnit;
public JobSchedule(long startTime, long period, TimeUnit timeUnit) {
this.startTime = startTime;
this.period = period;
this.timeUnit = timeUnit;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getPeriod() {
return period;
}
public void setPeriod(long period) {
this.period = period;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
public long getSecondPeriod() {
return getTimeUnit().toSeconds(getPeriod());
}
public String toString() {
return "START " + LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), ZoneId.systemDefault())
+ " EVERY(" + period + " " + timeUnit + ")";
}
}
public static final ImmutableList<String> SHOW_TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("Id")
.add("Name")
.add("TriggerMode")
.add("Schedule")
.add("DBName")
.add("MVName")
.add("Query")
.add("User")
.add("RetryPolicy")
.add("State")
.add("CreateTime")
.add("ExpireTime")
.add("LastModifyTime")
.build();
public List<String> toStringRow() {
List<String> list = Lists.newArrayList();
list.add(Long.toString(getId()));
list.add(getName());
list.add(getTriggerMode().toString());
list.add(getSchedule() == null ? "NULL" : getSchedule().toString());
list.add(getDBName());
list.add(getMVName());
list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery());
list.add(getUser());
list.add(getRetryPolicy().toString());
list.add(getState().toString());
list.add(MTMVUtils.getTimeString(getCreateTime()));
list.add(MTMVUtils.getTimeString(getExpireTime()));
list.add(MTMVUtils.getTimeString(getLastModifyTime()));
return list;
}
public synchronized void start() {
if (state == JobState.COMPLETE || state == JobState.PAUSE) {
return;
}
if (getTriggerMode() == TriggerMode.PERIODICAL) {
JobSchedule schedule = getSchedule();
ScheduledExecutorService periodScheduler = Env.getCurrentEnv().getMTMVJobManager().getPeriodScheduler();
future = periodScheduler.scheduleAtFixedRate(
() -> Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this),
MTMVUtils.getDelaySeconds(this), schedule.getSecondPeriod(), TimeUnit.SECONDS);
} else if (getTriggerMode() == TriggerMode.ONCE) {
Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this);
}
}
public synchronized void stop() {
// MUST not set true for "mayInterruptIfRunning".
// Because this thread may doing bdbje write operation, it is interrupted,
// FE may exit due to bdbje write failure.
if (future != null) {
boolean isCancel = future.cancel(false);
if (!isCancel) {
LOG.warn("fail to cancel scheduler for job [{}]", name);
}
}
Env.getCurrentEnv().getMTMVJobManager().getTaskManager().dealJobRemoved(this);
}
public void taskFinished() {
if (triggerMode == TriggerMode.ONCE) {
// update the run once job status
ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.COMPLETE);
updateJob(changeJob, false);
} else if (triggerMode == TriggerMode.PERIODICAL) {
// just update the last modify time.
ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.ACTIVE);
updateJob(changeJob, false);
}
}
public void updateJob(ChangeMTMVJob changeJob, boolean isReplay) {
setState(changeJob.getToStatus());
setLastModifyTime(changeJob.getLastModifyTime());
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
}
}
@Override
public int compareTo(@NotNull Object o) {
return (int) (getCreateTime() - ((MTMVJob) o).getCreateTime());
}
}

View File

@ -1,244 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mtmv.MTMVUtils;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.jetbrains.annotations.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class MTMVTask implements Writable, Comparable {
// also named query id in ConnectContext
@SerializedName("taskId")
private String taskId;
@SerializedName("jobName")
private String jobName;
@SerializedName("createTime")
private long createTime;
@SerializedName("finishTime")
private long finishTime;
@SerializedName("state")
private MTMVUtils.TaskState state = MTMVUtils.TaskState.PENDING;
@SerializedName("dbName")
private String dbName;
@SerializedName("mvName")
private String mvName;
@SerializedName("query")
private String query;
@SerializedName("user")
private String user;
@SerializedName("message")
private String message;
@SerializedName("errorCode")
private int errorCode;
@SerializedName("expireTime")
private long expireTime;
// the larger the value, the higher the priority, the default value is 0
@SerializedName("priority")
private int priority = 0;
@SerializedName("retryTimes")
private int retryTimes = 0;
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
public long getFinishTime() {
return finishTime;
}
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
}
public MTMVUtils.TaskState getState() {
return state;
}
public void setState(MTMVUtils.TaskState state) {
this.state = state;
}
public String getDBName() {
return dbName;
}
public void setDBName(String dbName) {
this.dbName = dbName;
}
public String getMVName() {
return mvName;
}
public void setMVName(String mvName) {
this.mvName = mvName;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getQuery() {
return query == null ? "" : query;
}
public void setQuery(String query) {
this.query = query;
}
public int getErrorCode() {
return errorCode;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public String getMessage() {
return message == null ? "" : message;
}
public void setMessage(String message) {
this.message = message;
}
public long getExpireTime() {
return expireTime;
}
public void setExpireTime(long expireTime) {
this.expireTime = expireTime;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public static MTMVTask read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, MTMVTask.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static final ImmutableList<String> SHOW_TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("TaskId")
.add("JobName")
.add("DBName")
.add("MVName")
.add("Query")
.add("User")
.add("Priority")
.add("RetryTimes")
.add("State")
.add("Message")
.add("ErrorCode")
.add("CreateTime")
.add("ExpireTime")
.add("FinishTime")
.build();
public List<String> toStringRow() {
List<String> list = Lists.newArrayList();
list.add(getTaskId());
list.add(getJobName());
list.add(getDBName());
list.add(getMVName());
list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery());
list.add(getUser());
list.add(Integer.toString(getPriority()));
list.add(Integer.toString(getRetryTimes()));
list.add(getState().toString());
list.add(getMessage().length() > 10240 ? getMessage().substring(0, 10240) : getMessage());
list.add(Integer.toString(getErrorCode()));
list.add(MTMVUtils.getTimeString(getCreateTime()));
list.add(MTMVUtils.getTimeString(getExpireTime()));
list.add(MTMVUtils.getTimeString(getFinishTime()));
return list;
}
@Override
public int compareTo(@NotNull Object o) {
return (int) (getCreateTime() - ((MTMVTask) o).getCreateTime());
}
}

View File

@ -1,61 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.analysis.MVRefreshInfo;
import org.apache.doris.analysis.TableName;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AlterMultiMaterializedView implements Writable {
@SerializedName(value = "mvName")
private TableName mvName;
@SerializedName(value = "info")
private MVRefreshInfo info;
public AlterMultiMaterializedView(TableName mvName, MVRefreshInfo info) {
this.mvName = mvName;
this.info = info;
}
public MVRefreshInfo getInfo() {
return info;
}
public TableName getMvName() {
return mvName;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static AlterMultiMaterializedView read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AlterMultiMaterializedView.class);
}
}

View File

@ -72,11 +72,6 @@ import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.policy.DropPolicyLog;
@ -968,37 +963,13 @@ public class EditLog {
env.getLoadManager().replayCleanLabel(log);
break;
}
case OperationType.OP_CREATE_MTMV_JOB: {
final MTMVJob job = (MTMVJob) journal.getData();
env.getMTMVJobManager().replayCreateJob(job);
break;
}
case OperationType.OP_CHANGE_MTMV_JOB: {
final ChangeMTMVJob changeJob = (ChangeMTMVJob) journal.getData();
env.getMTMVJobManager().replayUpdateJob(changeJob);
break;
}
case OperationType.OP_DROP_MTMV_JOB: {
final DropMTMVJob dropJob = (DropMTMVJob) journal.getData();
env.getMTMVJobManager().replayDropJobs(dropJob.getJobIds());
break;
}
case OperationType.OP_CREATE_MTMV_TASK: {
final MTMVTask task = (MTMVTask) journal.getData();
env.getMTMVJobManager().replayCreateJobTask(task);
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
break;
}
case OperationType.OP_DROP_MTMV_TASK: {
final DropMTMVTask dropTask = (DropMTMVTask) journal.getData();
env.getMTMVJobManager().replayDropJobTasks(dropTask.getTaskIds());
break;
}
case OperationType.OP_CREATE_MTMV_JOB:
case OperationType.OP_CHANGE_MTMV_JOB:
case OperationType.OP_DROP_MTMV_JOB:
case OperationType.OP_CREATE_MTMV_TASK:
case OperationType.OP_CHANGE_MTMV_TASK:
case OperationType.OP_DROP_MTMV_TASK:
case OperationType.OP_ALTER_MTMV_STMT: {
final AlterMultiMaterializedView alterView = (AlterMultiMaterializedView) journal.getData();
env.getAlterInstance().processAlterMaterializedView(alterView, true);
break;
}
case OperationType.OP_ALTER_USER: {
@ -1852,26 +1823,6 @@ public class EditLog {
logEdit(id, log);
}
public void logCreateMTMVJob(MTMVJob job) {
logEdit(OperationType.OP_CREATE_MTMV_JOB, job);
}
public void logDropMTMVJob(List<Long> jobIds) {
logEdit(OperationType.OP_DROP_MTMV_JOB, new DropMTMVJob(jobIds));
}
public void logChangeMTMVJob(ChangeMTMVJob changeJob) {
logEdit(OperationType.OP_CHANGE_MTMV_JOB, changeJob);
}
public void logCreateMTMVTask(MTMVTask task) {
logEdit(OperationType.OP_CREATE_MTMV_TASK, task);
}
public void logDropMTMVTasks(List<String> taskIds) {
logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
}
public void logInitCatalog(InitCatalogLog log) {
logEdit(OperationType.OP_INIT_CATALOG, log);
}
@ -1941,10 +1892,6 @@ public class EditLog {
logEdit(OperationType.OP_ALTER_USER, log);
}
public void logAlterMTMV(AlterMultiMaterializedView log) {
logEdit(OperationType.OP_ALTER_MTMV_STMT, log);
}
public void logCleanQueryStats(CleanQueryStatsInfo log) {
logEdit(OperationType.OP_CLEAN_QUERY_STATS, log);
}

View File

@ -271,16 +271,19 @@ public class OperationType {
@Deprecated
public static final short OP_INIT_EXTERNAL_TABLE = 329;
// scheduler job and task 330-350
@Deprecated
public static final short OP_CREATE_MTMV_JOB = 330;
@Deprecated
public static final short OP_DROP_MTMV_JOB = 331;
@Deprecated
public static final short OP_CHANGE_MTMV_JOB = 332;
@Deprecated
public static final short OP_CREATE_MTMV_TASK = 340;
@Deprecated
public static final short OP_DROP_MTMV_TASK = 341;
@Deprecated
public static final short OP_CHANGE_MTMV_TASK = 342;
@Deprecated
public static final short OP_ALTER_MTMV_STMT = 345;
public static final short OP_DROP_EXTERNAL_TABLE = 350;

View File

@ -197,12 +197,6 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveCatalog",
CountingDataOutputStream.class, long.class);
break;
case "mtmvJobManager":
metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadMTMVJobManager", DataInputStream.class,
long.class);
metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveMTMVJobManager",
CountingDataOutputStream.class, long.class);
break;
case "globalFunction":
metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadGlobalFunction", DataInputStream.class,
long.class);

View File

@ -38,7 +38,7 @@ public class PersistMetaModules {
"masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "workloadGroups",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "JobTaskManager");
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)

View File

@ -37,7 +37,6 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterMaterializedViewStmt;
import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
@ -64,7 +63,6 @@ import org.apache.doris.analysis.CreateFileStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateJobStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateResourceStmt;
@ -106,7 +104,6 @@ import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshLdapStmt;
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.ResumeJobStmt;
@ -154,8 +151,6 @@ public class DdlExecutor {
EncryptKeyHelper.createEncryptKey((CreateEncryptKeyStmt) ddlStmt);
} else if (ddlStmt instanceof DropEncryptKeyStmt) {
EncryptKeyHelper.dropEncryptKey((DropEncryptKeyStmt) ddlStmt);
} else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) {
env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof CreateTableStmt) {
env.createTable((CreateTableStmt) ddlStmt);
} else if (ddlStmt instanceof CreateTableLikeStmt) {
@ -340,12 +335,8 @@ public class DdlExecutor {
env.getCatalogMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt);
} else if (ddlStmt instanceof CleanLabelStmt) {
env.getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt);
} else if (ddlStmt instanceof AlterMaterializedViewStmt) {
env.alterMaterializedView((AlterMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof DropMaterializedViewStmt) {
env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshMaterializedViewStmt) {
env.refreshMaterializedView((RefreshMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshCatalogStmt) {
env.getCatalogMgr().refreshCatalog((RefreshCatalogStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshLdapStmt) {

View File

@ -69,8 +69,6 @@ import org.apache.doris.analysis.ShowLastInsertStmt;
import org.apache.doris.analysis.ShowLoadProfileStmt;
import org.apache.doris.analysis.ShowLoadStmt;
import org.apache.doris.analysis.ShowLoadWarningsStmt;
import org.apache.doris.analysis.ShowMTMVJobStmt;
import org.apache.doris.analysis.ShowMTMVTaskStmt;
import org.apache.doris.analysis.ShowPartitionIdStmt;
import org.apache.doris.analysis.ShowPartitionsStmt;
import org.apache.doris.analysis.ShowPluginsStmt;
@ -190,9 +188,6 @@ import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
@ -425,10 +420,6 @@ public class ShowExecutor {
handleCopyTablet();
} else if (stmt instanceof ShowCatalogRecycleBinStmt) {
handleShowCatalogRecycleBin();
} else if (stmt instanceof ShowMTMVJobStmt) {
handleMTMVJobs();
} else if (stmt instanceof ShowMTMVTaskStmt) {
handleMTMVTasks();
} else if (stmt instanceof ShowTypeCastStmt) {
handleShowTypeCastStmt();
} else if (stmt instanceof ShowBuildIndexStmt) {
@ -2796,46 +2787,6 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}
private void handleMTMVJobs() throws AnalysisException {
ShowMTMVJobStmt showStmt = (ShowMTMVJobStmt) stmt;
MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
List<MTMVJob> jobs = Lists.newArrayList();
if (showStmt.isShowAllJobs()) {
jobs.addAll(jobManager.showAllJobs());
} else if (showStmt.isShowAllJobsFromDb()) {
jobs.addAll(jobManager.showJobs(showStmt.getDbName()));
} else if (showStmt.isShowAllJobsOnMv()) {
jobs.addAll(jobManager.showJobs(showStmt.getDbName(), showStmt.getMVName()));
} else if (showStmt.isSpecificJob()) {
jobs.add(jobManager.getJob(showStmt.getJobName()));
}
List<List<String>> results = Lists.newArrayList();
for (MTMVJob job : jobs) {
results.add(job.toStringRow());
}
resultSet = new ShowResultSet(showStmt.getMetaData(), results);
}
private void handleMTMVTasks() throws AnalysisException {
ShowMTMVTaskStmt showStmt = (ShowMTMVTaskStmt) stmt;
MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
List<MTMVTask> tasks = Lists.newArrayList();
if (showStmt.isShowAllTasks()) {
tasks.addAll(jobManager.getTaskManager().showAllTasks());
} else if (showStmt.isShowAllTasksFromDb()) {
tasks.addAll(jobManager.getTaskManager().showTasksWithLock(showStmt.getDbName()));
} else if (showStmt.isShowAllTasksOnMv()) {
tasks.addAll(jobManager.getTaskManager().showTasks(showStmt.getDbName(), showStmt.getMVName()));
} else if (showStmt.isSpecificTask()) {
tasks.add(jobManager.getTaskManager().getTask(showStmt.getTaskId()));
}
List<List<String>> results = Lists.newArrayList();
for (MTMVTask task : tasks) {
results.add(task.toStringRow());
}
resultSet = new ShowResultSet(showStmt.getMetaData(), results);
}
private void handleShowTypeCastStmt() throws AnalysisException {
ShowTypeCastStmt showStmt = (ShowTypeCastStmt) stmt;