[feature-wip](MTMV) support multi catalog (#19854)
* mtmv support multi catalog * mtmv support multi catalog
This commit is contained in:
@ -41,6 +41,7 @@ import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
@ -1266,12 +1267,12 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
|
||||
public void processCreateMultiTablesMaterializedView(CreateMultiTableMaterializedViewStmt addMVClause)
|
||||
throws UserException {
|
||||
Map<String, Table> olapTables = addMVClause.getTables();
|
||||
Map<String, TableIf> olapTables = addMVClause.getTables();
|
||||
try {
|
||||
olapTables.values().forEach(Table::readLock);
|
||||
olapTables.values().forEach(TableIf::readLock);
|
||||
Env.getCurrentEnv().createTable(addMVClause);
|
||||
} finally {
|
||||
olapTables.values().forEach(Table::readUnlock);
|
||||
olapTables.values().forEach(TableIf::readUnlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
@ -55,10 +56,10 @@ public class ColumnPartitionDesc extends PartitionDesc {
|
||||
singlePartitionDescs = partitionDesc.getSinglePartitionDescs();
|
||||
}
|
||||
|
||||
private Table matchTable(Map<String, Table> olapTables) throws AnalysisException {
|
||||
private Table matchTable(Map<String, TableIf> olapTables) throws AnalysisException {
|
||||
Table matched = null;
|
||||
for (SlotRef column : columns) {
|
||||
Table table = olapTables.get(column.getDesc().getParent().getTable().getName());
|
||||
TableIf table = olapTables.get(column.getDesc().getParent().getTable().getName());
|
||||
if (table != null) {
|
||||
if (!(table instanceof OlapTable)) {
|
||||
throw new AnalysisException(
|
||||
@ -67,7 +68,7 @@ public class ColumnPartitionDesc extends PartitionDesc {
|
||||
if (matched != null && !matched.getName().equals(table.getName())) {
|
||||
throw new AnalysisException("The partition columns must be in the same table.");
|
||||
} else if (matched == null) {
|
||||
matched = table;
|
||||
matched = (Table) table;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,21 +20,25 @@ package org.apache.doris.analysis;
|
||||
import org.apache.doris.analysis.ColumnDef.DefaultValue;
|
||||
import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
@ -42,8 +46,7 @@ public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
private final MVRefreshInfo.BuildMode buildMode;
|
||||
private final MVRefreshInfo refreshInfo;
|
||||
private final QueryStmt queryStmt;
|
||||
private Database database;
|
||||
private final Map<String, Table> tables = Maps.newHashMap();
|
||||
private final Map<String, TableIf> tables = Maps.newHashMap();
|
||||
|
||||
public CreateMultiTableMaterializedViewStmt(String mvName, MVRefreshInfo.BuildMode buildMode,
|
||||
MVRefreshInfo refreshInfo, KeysDesc keyDesc, PartitionDesc partitionDesc, DistributionDesc distributionDesc,
|
||||
@ -73,7 +76,12 @@ public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
analyzeSelectClause((SelectStmt) queryStmt);
|
||||
}
|
||||
tableName = new TableName(null, database.getFullName(), mvName);
|
||||
String defaultDb = analyzer.getDefaultDb();
|
||||
if (Strings.isNullOrEmpty(defaultDb)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
|
||||
}
|
||||
|
||||
tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, defaultDb, mvName);
|
||||
if (partitionDesc != null) {
|
||||
((ColumnPartitionDesc) partitionDesc).analyze(analyzer, this);
|
||||
}
|
||||
@ -82,18 +90,26 @@ public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
|
||||
private void analyzeSelectClause(SelectStmt selectStmt) throws AnalysisException {
|
||||
for (TableRef tableRef : selectStmt.getTableRefs()) {
|
||||
Table table = null;
|
||||
TableIf table = null;
|
||||
if (tableRef instanceof BaseTableRef) {
|
||||
String dbName = tableRef.getName().getDb();
|
||||
if (database == null) {
|
||||
database = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
|
||||
} else if (!dbName.equals(database.getFullName())) {
|
||||
throw new AnalysisException("The databases of multiple tables must be the same.");
|
||||
String ctlName = tableRef.getName().getCtl();
|
||||
CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlName);
|
||||
if (catalogIf == null) {
|
||||
throw new AnalysisException("Failed to get the catalog: " + ctlName);
|
||||
}
|
||||
table = database.getTableOrAnalysisException(tableRef.getName().getTbl());
|
||||
Optional<DatabaseIf> dbIf = catalogIf.getDb(dbName);
|
||||
if (!dbIf.isPresent()) {
|
||||
throw new AnalysisException("Failed to get the database: " + dbName);
|
||||
}
|
||||
Optional<TableIf> tableIf = dbIf.get().getTable(tableRef.getName().getTbl());
|
||||
if (!tableIf.isPresent()) {
|
||||
throw new AnalysisException("Failed to get the table: " + tableRef.getName().getTbl());
|
||||
}
|
||||
table = tableIf.orElse(null);
|
||||
} else if (tableRef instanceof InlineViewRef) {
|
||||
InlineViewRef inlineViewRef = (InlineViewRef) tableRef;
|
||||
table = (Table) inlineViewRef.getDesc().getTable();
|
||||
table = inlineViewRef.getDesc().getTable();
|
||||
}
|
||||
if (table == null) {
|
||||
throw new AnalysisException("Failed to get the table by " + tableRef);
|
||||
@ -121,7 +137,7 @@ public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
private List<Column> generateSchema(SelectList selectList) throws AnalysisException {
|
||||
Map<String, Column> uniqueMVColumnItems = Maps.newLinkedHashMap();
|
||||
for (SelectListItem item : selectList.getItems()) {
|
||||
Column column = generateMTMVColumn(item);
|
||||
Column column = generateMTMVColumn(item);
|
||||
if (uniqueMVColumnItems.put(column.getName(), column) != null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, column.getName());
|
||||
}
|
||||
@ -187,11 +203,7 @@ public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt {
|
||||
return mvName;
|
||||
}
|
||||
|
||||
public Database getDatabase() {
|
||||
return database;
|
||||
}
|
||||
|
||||
public Map<String, Table> getTables() {
|
||||
public Map<String, TableIf> getTables() {
|
||||
return tables;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user