[Feature](multi-catalog) Support refresh catalog metadata (#11656)
This commit is contained in:
@ -808,6 +808,10 @@ refresh_stmt ::=
|
||||
{:
|
||||
RESULT = new RefreshMaterializedViewStmt(mv, MVRefreshInfo.RefreshMethod.COMPLETE);
|
||||
:}
|
||||
| KW_REFRESH KW_CATALOG ident:catalogName
|
||||
{:
|
||||
RESULT = new RefreshCatalogStmt(catalogName);
|
||||
:}
|
||||
;
|
||||
|
||||
clean_stmt ::=
|
||||
|
||||
@ -0,0 +1,67 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.InternalDataSource;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
/**
|
||||
* RefreshCatalogStmt
|
||||
* Manually refresh the catalog metadata.
|
||||
*/
|
||||
public class RefreshCatalogStmt extends DdlStmt {
|
||||
|
||||
private final String catalogName;
|
||||
|
||||
public RefreshCatalogStmt(String catalogName) {
|
||||
this.catalogName = catalogName;
|
||||
}
|
||||
|
||||
public String getCatalogName() {
|
||||
return catalogName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
Util.checkCatalogAllRules(catalogName);
|
||||
if (catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) {
|
||||
throw new AnalysisException("Internal catalog name can't be refresh.");
|
||||
}
|
||||
|
||||
if (!Env.getCurrentEnv().getAuth().checkCtlPriv(
|
||||
ConnectContext.get(), catalogName, PrivPredicate.ALTER)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED,
|
||||
analyzer.getQualifiedUser(), catalogName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append("REFRESH CATALOG ").append("`").append(catalogName).append("`");
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
}
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt;
|
||||
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
|
||||
import org.apache.doris.analysis.CreateCatalogStmt;
|
||||
import org.apache.doris.analysis.DropCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshCatalogStmt;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
@ -48,6 +49,8 @@ public class CatalogFactory {
|
||||
} else if (stmt instanceof AlterCatalogNameStmt) {
|
||||
log.setCatalogId(catalogId);
|
||||
log.setNewCatalogName(((AlterCatalogNameStmt) stmt).getNewCatalogName());
|
||||
} else if (stmt instanceof RefreshCatalogStmt) {
|
||||
log.setCatalogId(catalogId);
|
||||
} else {
|
||||
throw new RuntimeException("Unknown stmt for datasource manager " + stmt.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt;
|
||||
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
|
||||
import org.apache.doris.analysis.CreateCatalogStmt;
|
||||
import org.apache.doris.analysis.DropCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshCatalogStmt;
|
||||
import org.apache.doris.analysis.ShowCatalogStmt;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -95,6 +96,16 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
return catalog;
|
||||
}
|
||||
|
||||
private void unprotectedRefreshCatalog(long catalogId) {
|
||||
DataSourceIf catalog = idToCatalog.get(catalogId);
|
||||
if (catalog != null) {
|
||||
String catalogName = catalog.getName();
|
||||
if (!catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) {
|
||||
((ExternalDataSource) catalog).setInitialized(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public InternalDataSource getInternalDataSource() {
|
||||
return internalDataSource;
|
||||
}
|
||||
@ -192,7 +203,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
long id = Env.getCurrentEnv().getNextId();
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(id, stmt);
|
||||
replayCreateCatalog(log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_DS, log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_CATALOG, log);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
@ -214,7 +225,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
replayDropCatalog(log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_DS, log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_CATALOG, log);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
@ -232,7 +243,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
replayAlterCatalogName(log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_NAME, log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_NAME, log);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
@ -253,7 +264,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
replayAlterCatalogProps(log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_PROPS, log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_PROPS, log);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
@ -308,6 +319,24 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
return new ShowResultSet(showStmt.getMetaData(), rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh the catalog meta and write the meta log.
|
||||
*/
|
||||
public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
|
||||
writeLock();
|
||||
try {
|
||||
DataSourceIf catalog = nameToCatalog.get(stmt.getCatalogName());
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
replayRefreshCatalog(log);
|
||||
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_REFRESH_CATALOG, log);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply for create catalog event.
|
||||
*/
|
||||
@ -333,6 +362,18 @@ public class DataSourceMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply for refresh catalog event.
|
||||
*/
|
||||
public void replayRefreshCatalog(CatalogLog log) throws DdlException {
|
||||
writeLock();
|
||||
try {
|
||||
unprotectedRefreshCatalog(log.getCatalogId());
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply for alter catalog name event.
|
||||
*/
|
||||
|
||||
@ -60,8 +60,6 @@ public class EsExternalDataSource extends ExternalDataSource {
|
||||
|
||||
private EsRestClient esRestClient;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
private String[] nodes;
|
||||
|
||||
private String username = null;
|
||||
|
||||
@ -49,6 +49,7 @@ public abstract class ExternalDataSource implements DataSourceIf<ExternalDatabas
|
||||
// save properties of this data source, such as hive meta store url.
|
||||
@SerializedName(value = "dsProperty")
|
||||
protected DataSourceProperty dsProperty = new DataSourceProperty();
|
||||
protected boolean initialized = false;
|
||||
|
||||
/**
|
||||
* @return names of database in this data source.
|
||||
|
||||
@ -44,7 +44,6 @@ public class HMSExternalDataSource extends ExternalDataSource {
|
||||
// Cache of db name to db id.
|
||||
private Map<String, Long> dbNameToId;
|
||||
private Map<Long, HMSExternalDatabase> idToDb;
|
||||
private boolean initialized = false;
|
||||
protected HiveMetaStoreClient client;
|
||||
|
||||
/**
|
||||
|
||||
@ -667,10 +667,10 @@ public class JournalEntity implements Writable {
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_DS:
|
||||
case OperationType.OP_DROP_DS:
|
||||
case OperationType.OP_ALTER_DS_NAME:
|
||||
case OperationType.OP_ALTER_DS_PROPS: {
|
||||
case OperationType.OP_CREATE_CATALOG:
|
||||
case OperationType.OP_DROP_CATALOG:
|
||||
case OperationType.OP_ALTER_CATALOG_NAME:
|
||||
case OperationType.OP_ALTER_CATALOG_PROPS: {
|
||||
data = CatalogLog.read(in);
|
||||
isRead = true;
|
||||
break;
|
||||
|
||||
@ -95,6 +95,9 @@ public class EditLog {
|
||||
|
||||
private Journal journal;
|
||||
|
||||
/**
|
||||
* The constructor.
|
||||
**/
|
||||
public EditLog(String nodeName) {
|
||||
String journalType = Config.edit_log_type;
|
||||
if (journalType.equalsIgnoreCase("bdb")) {
|
||||
@ -134,6 +137,9 @@ public class EditLog {
|
||||
return journal == null ? 0 : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load journal.
|
||||
**/
|
||||
public static void loadJournal(Env env, JournalEntity journal) {
|
||||
short opCode = journal.getOpCode();
|
||||
if (opCode != OperationType.OP_SAVE_NEXTID && opCode != OperationType.OP_TIMESTAMP) {
|
||||
@ -838,26 +844,31 @@ public class EditLog {
|
||||
env.getPolicyMgr().replayStoragePolicyAlter(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_DS: {
|
||||
case OperationType.OP_CREATE_CATALOG: {
|
||||
CatalogLog log = (CatalogLog) journal.getData();
|
||||
env.getDataSourceMgr().replayCreateCatalog(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_DROP_DS: {
|
||||
case OperationType.OP_DROP_CATALOG: {
|
||||
CatalogLog log = (CatalogLog) journal.getData();
|
||||
env.getDataSourceMgr().replayDropCatalog(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_DS_NAME: {
|
||||
case OperationType.OP_ALTER_CATALOG_NAME: {
|
||||
CatalogLog log = (CatalogLog) journal.getData();
|
||||
env.getDataSourceMgr().replayAlterCatalogName(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_DS_PROPS: {
|
||||
case OperationType.OP_ALTER_CATALOG_PROPS: {
|
||||
CatalogLog log = (CatalogLog) journal.getData();
|
||||
env.getDataSourceMgr().replayAlterCatalogProps(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_REFRESH_CATALOG: {
|
||||
CatalogLog log = (CatalogLog) journal.getData();
|
||||
env.getDataSourceMgr().replayRefreshCatalog(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: {
|
||||
final TableAddOrDropColumnsInfo info = (TableAddOrDropColumnsInfo) journal.getData();
|
||||
env.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info);
|
||||
@ -875,7 +886,7 @@ public class EditLog {
|
||||
}
|
||||
}
|
||||
} catch (MetaNotFoundException e) {
|
||||
/**
|
||||
/*
|
||||
* In the following cases, doris may record metadata modification information
|
||||
* for a table that no longer exists.
|
||||
* 1. Thread 1: get TableA object
|
||||
|
||||
@ -20,6 +20,9 @@ package org.apache.doris.persist;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
|
||||
/**
|
||||
* Operation name and code mapping.
|
||||
**/
|
||||
public class OperationType {
|
||||
// OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit log run.
|
||||
public static final short OP_LOCAL_EOF = -1;
|
||||
@ -227,15 +230,18 @@ public class OperationType {
|
||||
// policy 310-320
|
||||
public static final short OP_CREATE_POLICY = 310;
|
||||
public static final short OP_DROP_POLICY = 311;
|
||||
public static final short OP_ALTER_STORAGE_POLICY = 312;
|
||||
|
||||
// datasource 312-315
|
||||
public static final short OP_CREATE_DS = 312;
|
||||
public static final short OP_DROP_DS = 313;
|
||||
public static final short OP_ALTER_DS_NAME = 314;
|
||||
public static final short OP_ALTER_DS_PROPS = 315;
|
||||
public static final short OP_ALTER_STORAGE_POLICY = 316;
|
||||
// catalog 320-330
|
||||
public static final short OP_CREATE_CATALOG = 320;
|
||||
public static final short OP_DROP_CATALOG = 321;
|
||||
public static final short OP_ALTER_CATALOG_NAME = 322;
|
||||
public static final short OP_ALTER_CATALOG_PROPS = 323;
|
||||
public static final short OP_REFRESH_CATALOG = 324;
|
||||
|
||||
// get opcode name by op codeStri
|
||||
/**
|
||||
* Get opcode name by op code.
|
||||
**/
|
||||
public static String getOpName(short opCode) {
|
||||
try {
|
||||
Field[] fields = OperationType.class.getDeclaredFields();
|
||||
|
||||
@ -95,6 +95,7 @@ import org.apache.doris.analysis.PauseSyncJobStmt;
|
||||
import org.apache.doris.analysis.RecoverDbStmt;
|
||||
import org.apache.doris.analysis.RecoverPartitionStmt;
|
||||
import org.apache.doris.analysis.RecoverTableStmt;
|
||||
import org.apache.doris.analysis.RefreshCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshDbStmt;
|
||||
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
|
||||
import org.apache.doris.analysis.RefreshTableStmt;
|
||||
@ -116,7 +117,13 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.sync.SyncJobManager;
|
||||
|
||||
/**
|
||||
* Use for execute ddl.
|
||||
**/
|
||||
public class DdlExecutor {
|
||||
/**
|
||||
* Execute ddl.
|
||||
**/
|
||||
public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
|
||||
if (ddlStmt instanceof CreateClusterStmt) {
|
||||
CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
|
||||
@ -153,8 +160,6 @@ public class DdlExecutor {
|
||||
env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) {
|
||||
env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof DropMaterializedViewStmt) {
|
||||
env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AlterTableStmt) {
|
||||
env.alterTable((AlterTableStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AlterTableStatsStmt) {
|
||||
@ -322,13 +327,15 @@ public class DdlExecutor {
|
||||
} else if (ddlStmt instanceof AlterCatalogPropertyStmt) {
|
||||
env.getDataSourceMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof CleanLabelStmt) {
|
||||
env.getCurrentEnv().getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt);
|
||||
env.getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AlterMaterializedViewStmt) {
|
||||
env.alterMaterializedView((AlterMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof DropMaterializedViewStmt) {
|
||||
env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof RefreshMaterializedViewStmt) {
|
||||
env.refreshMaterializedView((RefreshMaterializedViewStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof RefreshCatalogStmt) {
|
||||
env.getDataSourceMgr().refreshCatalog((RefreshCatalogStmt) ddlStmt);
|
||||
} else {
|
||||
throw new DdlException("Unknown statement.");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user