diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 5941ea8305..62c3a1a526 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -131,6 +131,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.datasource.ExternalMetaIdMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HiveTransactionMgr; import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; @@ -362,6 +363,7 @@ public class Env { private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; private CooldownConfHandler cooldownConfHandler; + private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; private ExportTaskRegister exportTaskRegister; @@ -649,6 +651,7 @@ public class Env { if (Config.enable_storage_policy) { this.cooldownConfHandler = new CooldownConfHandler(); } + this.externalMetaIdMgr = new ExternalMetaIdMgr(); this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.jobManager = new JobManager<>(); this.labelProcessor = new LabelProcessor(); @@ -844,6 +847,14 @@ public class Env { return workloadRuntimeStatusMgr; } + public ExternalMetaIdMgr getExternalMetaIdMgr() { + return externalMetaIdMgr; + } + + public MetastoreEventsProcessor getMetastoreEventsProcessor() { + return metastoreEventsProcessor; + } + // use this to get correct ClusterInfoService instance public static SystemInfoService getCurrentSystemInfo() { return getCurrentEnv().getClusterInfo(); @@ -1638,9 +1649,6 @@ public class Env { streamLoadRecordMgr.start(); tabletLoadIndexRecorderMgr.start(); new InternalSchemaInitializer().start(); - if (Config.enable_hms_events_incremental_sync) { - metastoreEventsProcessor.start(); - } getRefreshManager().start(); // binlog gcer @@ -1662,6 +1670,9 @@ public class Env { domainResolver.start(); // fe disk updater feDiskUpdater.start(); + if (Config.enable_hms_events_incremental_sync) { + metastoreEventsProcessor.start(); + } } private void transferToNonMaster(FrontendNodeType newType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index e6584d4a0b..4af08e9b38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -146,8 +146,15 @@ public abstract class ExternalDatabase Map tmpIdToTbl = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { T table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); + // When upgrade cluster with this pr: https://github.com/apache/doris/pull/27666 + // Maybe there are some create table events will be skipped + // if the cluster has any hms catalog(s) with hms event listener enabled. + // So we need add a validation here to avoid table(s) not found, this is just a temporary solution + // because later we will remove all the logics about InitCatalogLog/InitDatabaseLog. + if (table != null) { + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } } for (int i = 0; i < log.getCreateCount(); i++) { T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog); @@ -195,8 +202,7 @@ public abstract class ExternalDatabase idToTbl = tmpIdToTbl; } - long currentTime = System.currentTimeMillis(); - lastUpdateTime = currentTime; + lastUpdateTime = System.currentTimeMillis(); initDatabaseLog.setLastUpdateTime(lastUpdateTime); initialized = true; Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); @@ -370,17 +376,13 @@ public abstract class ExternalDatabase throw new NotImplementedException("dropTable() is not implemented"); } - public void dropTableForReplay(String tableName) { - throw new NotImplementedException("replayDropTableFromEvent() is not implemented"); - } - @Override public CatalogIf getCatalog() { return extCatalog; } // Only used for sync hive metastore event - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { throw new NotImplementedException("createTable() is not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index d75f86bd08..f586ea7ed8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -64,17 +64,6 @@ public class HMSExternalDatabase extends ExternalDatabase { @Override public void dropTable(String tableName) { - LOG.debug("drop table [{}]", tableName); - makeSureInitialized(); - Long tableId = tableNameToId.remove(tableName); - if (tableId == null) { - LOG.warn("drop table [{}] failed", tableName); - } - idToTbl.remove(tableId); - } - - @Override - public void dropTableForReplay(String tableName) { LOG.debug("replayDropTableFromEvent [{}]", tableName); Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -85,7 +74,7 @@ public class HMSExternalDatabase extends ExternalDatabase { } @Override - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { LOG.debug("create table [{}]", tableName); tableNameToId.put(tableName, tableId); HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java index a915b3b241..1b5cd805a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java @@ -49,7 +49,7 @@ public class IcebergExternalDatabase extends ExternalDatabase oldProperties = catalog.getProperties(); if (catalog == null) { throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); } + Map oldProperties = catalog.getProperties(); if (stmt.getNewProperties().containsKey("type") && !catalog.getType() .equalsIgnoreCase(stmt.getNewProperties().get("type"))) { throw new DdlException("Can't modify the type of catalog property with name: " + stmt.getCatalogName()); @@ -681,12 +682,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { ((HMSExternalTable) table).unsetObjectCreated(); ((HMSExternalTable) table).setEventUpdateTime(updateTime); Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setLastUpdateTime(updateTime); - Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); } public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists) @@ -772,43 +767,16 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setLastUpdateTime(System.currentTimeMillis()); - replayDropExternalTable(log); - Env.getCurrentEnv().getEditLog().logDropExternalTable(log); - } - public void replayDropExternalTable(ExternalObjectLog log) { - LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(), - log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } db.writeLock(); try { - db.dropTableForReplay(table.getName()); - db.setLastUpdateTime(log.getLastUpdateTime()); + db.dropTable(table.getName()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache( + catalog.getId(), db.getFullName(), table.getName()); + ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); } finally { db.writeUnlock(); } - - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); } public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException { @@ -822,9 +790,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName); } - public void createExternalTableFromEvent(String dbName, String tableName, String catalogName, - boolean ignoreIfExists) - throws DdlException { + public void createExternalTableFromEvent(String dbName, String tableName, + String catalogName, long updateTime, + boolean ignoreIfExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); @@ -847,33 +815,21 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableName(tableName); - log.setTableId(Env.getCurrentEnv().getNextId()); - log.setLastUpdateTime(System.currentTimeMillis()); - replayCreateExternalTableFromEvent(log); - Env.getCurrentEnv().getEditLog().logCreateExternalTable(log); - } - public void replayCreateExternalTableFromEvent(ExternalObjectLog log) { - LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId(), log.getTableName()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + long tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName); + // -1L means it will be dropped later, ignore + if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) { return; } + db.writeLock(); try { - db.createTableForReplay(log.getTableName(), log.getTableId()); - db.setLastUpdateTime(log.getLastUpdateTime()); + ((HMSExternalDatabase) db).createTable(tableName, tblId); + ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); + table = db.getTableNullable(tableName); + if (table != null) { + ((HMSExternalTable) table).setEventUpdateTime(updateTime); + } } finally { db.writeUnlock(); } @@ -895,34 +851,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setInvalidCache(true); - replayDropExternalDatabase(log); - Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log); - } - - public void replayDropExternalDatabase(ExternalObjectLog log) { - writeLock(); - try { - LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - catalog.dropDatabaseForReplay(db.getFullName()); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), db.getFullName()); - } finally { - writeUnlock(); - } + ((HMSExternalCatalog) catalog).dropDatabase(dbName); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), dbName); } public void createExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) throws DdlException { @@ -941,28 +871,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(Env.getCurrentEnv().getNextId()); - log.setDbName(dbName); - replayCreateExternalDatabase(log); - Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log); - } - - public void replayCreateExternalDatabase(ExternalObjectLog log) { - writeLock(); - try { - LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", log.getCatalogId(), - log.getDbId(), log.getDbName()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - catalog.createDatabaseForReplay(log.getDbId(), log.getDbName()); - } finally { - writeUnlock(); + long dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName); + // -1L means it will be dropped later, ignore + if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) { + return; } + + ((HMSExternalCatalog) catalog).createDatabase(dbId, dbName); } public void addExternalPartitions(String catalogName, String dbName, String tableName, @@ -998,48 +913,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { HMSExternalTable hmsTable = (HMSExternalTable) table; Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames); hmsTable.setEventUpdateTime(updateTime); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); - } - - public void replayAddExternalPartitions(ExternalObjectLog log) { - LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } - - HMSExternalTable hmsTable = (HMSExternalTable) table; - try { - Env.getCurrentEnv().getExtMetaCacheMgr() - .addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setEventUpdateTime(log.getLastUpdateTime()); - } catch (HMSClientException e) { - LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), - db.getFullName(), table.getName()); - } } public void dropExternalPartitions(String catalogName, String dbName, String tableName, @@ -1068,42 +941,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - replayDropExternalPartitions(log); - Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log); - } - - public void replayDropExternalPartitions(ExternalObjectLog log) { - LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } HMSExternalTable hmsTable = (HMSExternalTable) table; - Env.getCurrentEnv().getExtMetaCacheMgr() - .dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setEventUpdateTime(log.getLastUpdateTime()); + Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames); + hmsTable.setEventUpdateTime(updateTime); } public void refreshExternalPartitions(String catalogName, String dbName, String tableName, @@ -1135,42 +975,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - replayRefreshExternalPartitions(log); - Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log); - } - - public void replayRefreshExternalPartitions(ExternalObjectLog log) { - LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(), - log.getPartitionNames()); - ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache( + catalog.getId(), db.getFullName(), table.getName(), partitionNames); + ((HMSExternalTable) table).setEventUpdateTime(updateTime); } public void registerCatalogRefreshListener(Env env) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 3ff599d0ab..6fcd495b67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -521,13 +521,6 @@ public abstract class ExternalCatalog return null; } - /** - * External catalog has no cluster semantics. - */ - protected static String getRealTableName(String tableName) { - return ClusterNamespace.getNameFromFullName(tableName); - } - public static ExternalCatalog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ExternalCatalog.class); @@ -546,9 +539,6 @@ public abstract class ExternalCatalog db.setTableExtCatalog(this); } objectCreated = false; - if (this instanceof HMSExternalCatalog) { - ((HMSExternalCatalog) this).setLastSyncedEventId(-1L); - } // TODO: This code is to compatible with older version of metadata. // Could only remove after all users upgrate to the new version. if (logType == null) { @@ -569,11 +559,11 @@ public abstract class ExternalCatalog dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } - public void dropDatabaseForReplay(String dbName) { + public void dropDatabase(String dbName) { throw new NotImplementedException("dropDatabase not implemented"); } - public void createDatabaseForReplay(long dbId, String dbName) { + public void createDatabase(long dbId, String dbName) { throw new NotImplementedException("createDatabase not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java new file mode 100644 index 0000000000..621c25b369 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Env; +import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +/** + *
+ * ExternalMetaIdMgr is responsible for managing external meta ids.
+ * Now it just manages the external meta ids of hms events,
+ * but it will be extended to manage other external meta ids in the future.
+ * 
+ * TODO: remove InitCatalogLog and InitDatabaseLog, manage external meta ids at ExternalMetaIdMgr + */ +public class ExternalMetaIdMgr { + + private static final Logger LOG = LogManager.getLogger(ExternalMetaIdMgr.class); + + public static final long META_ID_FOR_NOT_EXISTS = -1L; + + private final Map idToCtlMgr = Maps.newConcurrentMap(); + + public ExternalMetaIdMgr() { + } + + // invoke this method only on master + public static long nextMetaId() { + return Env.getCurrentEnv().getNextId(); + } + + // return the db id of the specified db, -1 means not exists + public long getDbId(long catalogId, String dbName) { + DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName); + if (dbMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return dbMetaIdMgr.dbId; + } + + // return the tbl id of the specified tbl, -1 means not exists + public long getTblId(long catalogId, String dbName, String tblName) { + TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName); + if (tblMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return tblMetaIdMgr.tblId; + } + + // return the partition id of the specified partition, -1 means not exists + public long getPartitionId(long catalogId, String dbName, + String tblName, String partitionName) { + PartitionMetaIdMgr partitionMetaIdMgr = getPartitionMetaIdMgr(catalogId, dbName, tblName, partitionName); + if (partitionMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return partitionMetaIdMgr.partitionId; + } + + private @Nullable DbMetaIdMgr getDbMetaIdMgr(long catalogId, String dbName) { + CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.get(catalogId); + if (ctlMetaIdMgr == null) { + return null; + } + return ctlMetaIdMgr.dbNameToMgr.get(dbName); + } + + private @Nullable TblMetaIdMgr getTblMetaIdMgr(long catalogId, String dbName, String tblName) { + DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName); + if (dbMetaIdMgr == null) { + return null; + } + return dbMetaIdMgr.tblNameToMgr.get(tblName); + } + + private PartitionMetaIdMgr getPartitionMetaIdMgr(long catalogId, String dbName, + String tblName, String partitionName) { + TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName); + if (tblMetaIdMgr == null) { + return null; + } + return tblMetaIdMgr.partitionNameToMgr.get(partitionName); + } + + public void replayMetaIdMappingsLog(@NotNull MetaIdMappingsLog log) { + Preconditions.checkNotNull(log); + long catalogId = log.getCatalogId(); + CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, CtlMetaIdMgr::new); + for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) { + handleMetaIdMapping(mapping, ctlMetaIdMgr); + } + if (log.isFromHmsEvent()) { + CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId()); + if (catalogIf != null) { + MetastoreEventsProcessor metastoreEventsProcessor = Env.getCurrentEnv().getMetastoreEventsProcessor(); + metastoreEventsProcessor.updateMasterLastSyncedEventId( + (HMSExternalCatalog) catalogIf, log.getLastSyncedEventId()); + } + } + } + + // no lock because the operations is serialized currently + private void handleMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, CtlMetaIdMgr ctlMetaIdMgr) { + MetaIdMappingsLog.OperationType opType = MetaIdMappingsLog.getOperationType(mapping.getOpType()); + MetaIdMappingsLog.MetaObjectType objType = MetaIdMappingsLog.getMetaObjectType(mapping.getMetaObjType()); + switch (opType) { + case ADD: + handleAddMetaIdMapping(mapping, ctlMetaIdMgr, objType); + break; + + case DELETE: + handleDelMetaIdMapping(mapping, ctlMetaIdMgr, objType); + break; + + default: + break; + } + } + + private static void handleDelMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, + CtlMetaIdMgr ctlMetaIdMgr, + MetaIdMappingsLog.MetaObjectType objType) { + TblMetaIdMgr tblMetaIdMgr; + DbMetaIdMgr dbMetaIdMgr; + switch (objType) { + case DATABASE: + ctlMetaIdMgr.dbNameToMgr.remove(mapping.getDbName()); + break; + + case TABLE: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName()); + if (dbMetaIdMgr != null) { + dbMetaIdMgr.tblNameToMgr.remove(mapping.getTblName()); + } + break; + + case PARTITION: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName()); + if (dbMetaIdMgr != null) { + tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr.get(mapping.getTblName()); + if (tblMetaIdMgr != null) { + tblMetaIdMgr.partitionNameToMgr.remove(mapping.getPartitionName()); + } + } + break; + + default: + break; + } + } + + private static void handleAddMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, + CtlMetaIdMgr ctlMetaIdMgr, + MetaIdMappingsLog.MetaObjectType objType) { + DbMetaIdMgr dbMetaIdMgr; + TblMetaIdMgr tblMetaIdMgr; + switch (objType) { + case DATABASE: + ctlMetaIdMgr.dbNameToMgr.put(mapping.getDbName(), + new DbMetaIdMgr(mapping.getId(), mapping.getDbName())); + break; + + case TABLE: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr + .computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new); + dbMetaIdMgr.tblNameToMgr.put(mapping.getTblName(), + new TblMetaIdMgr(mapping.getId(), mapping.getTblName())); + break; + + case PARTITION: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr + .computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new); + tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr + .computeIfAbsent(mapping.getTblName(), TblMetaIdMgr::new); + tblMetaIdMgr.partitionNameToMgr.put(mapping.getPartitionName(), + new PartitionMetaIdMgr(mapping.getId(), mapping.getPartitionName())); + break; + + default: + break; + } + } + + public static class CtlMetaIdMgr { + protected final long catalogId; + + protected CtlMetaIdMgr(long catalogId) { + this.catalogId = catalogId; + } + + protected Map dbNameToMgr = Maps.newConcurrentMap(); + } + + public static class DbMetaIdMgr { + protected volatile long dbId = META_ID_FOR_NOT_EXISTS; + protected final String dbName; + + protected DbMetaIdMgr(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + + protected DbMetaIdMgr(String dbName) { + this.dbName = dbName; + } + + protected Map tblNameToMgr = Maps.newConcurrentMap(); + } + + public static class TblMetaIdMgr { + protected volatile long tblId = META_ID_FOR_NOT_EXISTS; + protected final String tblName; + + protected TblMetaIdMgr(long tblId, String tblName) { + this.tblId = tblId; + this.tblName = tblName; + } + + protected TblMetaIdMgr(String tblName) { + this.tblName = tblName; + } + + protected Map partitionNameToMgr = Maps.newConcurrentMap(); + } + + public static class PartitionMetaIdMgr { + protected volatile long partitionId = META_ID_FOR_NOT_EXISTS; + protected final String partitionName; + + protected PartitionMetaIdMgr(long partitionId, String partitionName) { + this.partitionId = partitionId; + this.partitionName = partitionName; + } + + protected PartitionMetaIdMgr(String partitionName) { + this.partitionName = partitionName; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index ae6b5f0473..dd6788ade2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -23,23 +23,19 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HMSCachedClientFactory; -import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,10 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int MIN_CLIENT_POOL_SIZE = 8; protected HMSCachedClient client; - // Record the latest synced event id when processing hive events - // Must set to -1 otherwise client.getNextNotification will throw exception - // Reference to https://github.com/apDdlache/doris/issues/18251 - private long lastSyncedEventId = -1L; + public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter"; public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second"; // broker name for file split and query scan. @@ -132,18 +125,6 @@ public class HMSExternalCatalog extends ExternalCatalog { } } - public String getHiveMetastoreUris() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); - } - - public String getHiveVersion() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); - } - - protected List listDatabaseNames() { - return client.getAllDatabases(); - } - @Override protected void initLocalObjectsImpl() { HiveConf hiveConf = null; @@ -195,13 +176,13 @@ public class HMSExternalCatalog extends ExternalCatalog { hmsExternalDatabase.getTables().forEach(table -> names.add(table.getName())); return names; } else { - return client.getAllTables(getRealTableName(dbName)); + return client.getAllTables(ClusterNamespace.getNameFromFullName(dbName)); } } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { - return client.tableExists(getRealTableName(dbName), tblName); + return client.tableExists(ClusterNamespace.getNameFromFullName(dbName), tblName); } @Override @@ -211,7 +192,7 @@ public class HMSExternalCatalog extends ExternalCatalog { if (hmsExternalDatabase == null) { return false; } - return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent(); + return hmsExternalDatabase.getTable(ClusterNamespace.getNameFromFullName(tblName)).isPresent(); } public HMSCachedClient getClient() { @@ -219,69 +200,8 @@ public class HMSExternalCatalog extends ExternalCatalog { return client; } - public void setLastSyncedEventId(long lastSyncedEventId) { - this.lastSyncedEventId = lastSyncedEventId; - } - - public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog) - throws MetastoreNotificationFetchException { - makeSureInitialized(); - long currentEventId = getCurrentEventId(); - if (lastSyncedEventId < 0) { - refreshCatalog(hmsExternalCatalog); - // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events - // but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog - lastSyncedEventId = currentEventId; - LOG.info( - "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," - + "lastSyncedEventId is [{}]", - hmsExternalCatalog.getName(), lastSyncedEventId); - return null; - } - - LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}", - hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); - if (currentEventId == lastSyncedEventId) { - LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); - return null; - } - - try { - return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null); - } catch (MetastoreNotificationFetchException e) { - // Need a fallback to handle this because this error state can not be recovered until restarting FE - if (StringUtils.isNotEmpty(e.getMessage()) - && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { - refreshCatalog(hmsExternalCatalog); - // set lastSyncedEventId to currentEventId after refresh catalog successfully - lastSyncedEventId = currentEventId; - LOG.warn("Notification events are missing, maybe an event can not be handled " - + "or processing rate is too low, fallback to refresh the catalog"); - return null; - } - throw e; - } - } - - private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) { - CatalogLog log = new CatalogLog(); - log.setCatalogId(hmsExternalCatalog.getId()); - log.setInvalidCache(true); - Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log); - } - - private long getCurrentEventId() { - makeSureInitialized(); - CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId(); - if (currentNotificationEventId == null) { - LOG.warn("Get currentNotificationEventId is null"); - return -1; - } - return currentNotificationEventId.getEventId(); - } - @Override - public void dropDatabaseForReplay(String dbName) { + public void dropDatabase(String dbName) { LOG.debug("drop database [{}]", dbName); Long dbId = dbNameToId.remove(dbName); if (dbId == null) { @@ -291,7 +211,7 @@ public class HMSExternalCatalog extends ExternalCatalog { } @Override - public void createDatabaseForReplay(long dbId, String dbName) { + public void createDatabase(long dbId, String dbName) { LOG.debug("create database [{}]", dbName); dbNameToId.put(dbName, dbId); ExternalDatabase db = getDbForInit(dbName, dbId, logType); @@ -317,4 +237,17 @@ public class HMSExternalCatalog extends ExternalCatalog { catalogProperty.addProperty(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true"); } } + + public String getHiveMetastoreUris() { + return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); + } + + public String getHiveVersion() { + return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); + } + + protected List listDatabaseNames() { + return client.getAllDatabases(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java deleted file mode 100644 index 2f462b551c..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource; - -import org.apache.doris.catalog.Column; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.persist.gson.GsonUtils; - -import com.google.gson.annotations.SerializedName; -import lombok.Data; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -@Data -public class InitTableLog implements Writable { - enum Type { - HMS, - ES, - UNKNOWN; - } - - @SerializedName(value = "catalogId") - private long catalogId; - - @SerializedName(value = "dbId") - private long dbId; - - @SerializedName(value = "tableId") - private long tableId; - - @SerializedName(value = "type") - private Type type; - - @SerializedName(value = "schema") - protected volatile List schema; - - public InitTableLog() {} - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, GsonUtils.GSON.toJson(this)); - } - - public static InitTableLog read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, InitTableLog.class); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java new file mode 100644 index 0000000000..629b4d13a4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import lombok.Getter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +@Data +public class MetaIdMappingsLog implements Writable { + + public static final short OPERATION_TYPE_IGNORE = 0; + public static final short OPERATION_TYPE_ADD = 1; + public static final short OPERATION_TYPE_DELETE = 2; + + public static final short META_OBJECT_TYPE_IGNORE = 0; + public static final short META_OBJECT_TYPE_DATABASE = 1; + public static final short META_OBJECT_TYPE_TABLE = 2; + public static final short META_OBJECT_TYPE_PARTITION = 3; + + @SerializedName(value = "ctlId") + private long catalogId = -1L; + + @SerializedName(value = "fromEvent") + private boolean fromHmsEvent = false; + + // The synced event id of master + @SerializedName(value = "lastEventId") + private long lastSyncedEventId = -1L; + + @SerializedName(value = "metaIdMappings") + private List metaIdMappings = Lists.newLinkedList(); + + public MetaIdMappingsLog() { + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, lastSyncedEventId, + metaIdMappings == null ? 0 : Arrays.hashCode(metaIdMappings.toArray())); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MetaIdMappingsLog)) { + return false; + } + return Objects.equals(this.catalogId, ((MetaIdMappingsLog) obj).catalogId) + && Objects.equals(this.fromHmsEvent, ((MetaIdMappingsLog) obj).fromHmsEvent) + && Objects.equals(this.lastSyncedEventId, ((MetaIdMappingsLog) obj).lastSyncedEventId) + && Objects.equals(this.metaIdMappings, ((MetaIdMappingsLog) obj).metaIdMappings); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static MetaIdMappingsLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, MetaIdMappingsLog.class); + } + + public void addMetaIdMapping(MetaIdMapping metaIdMapping) { + this.metaIdMappings.add(metaIdMapping); + } + + public void addMetaIdMappings(List metaIdMappings) { + this.metaIdMappings.addAll(metaIdMappings); + } + + public static OperationType getOperationType(short opType) { + switch (opType) { + case OPERATION_TYPE_ADD: + return OperationType.ADD; + case OPERATION_TYPE_DELETE: + return OperationType.DELETE; + default: + return OperationType.IGNORE; + } + } + + public static MetaObjectType getMetaObjectType(short metaObjType) { + switch (metaObjType) { + case META_OBJECT_TYPE_DATABASE: + return MetaObjectType.DATABASE; + case META_OBJECT_TYPE_TABLE: + return MetaObjectType.TABLE; + case META_OBJECT_TYPE_PARTITION: + return MetaObjectType.PARTITION; + default: + return MetaObjectType.IGNORE; + } + } + + @Getter + public static class MetaIdMapping { + + @SerializedName(value = "opType") + private short opType; + @SerializedName(value = "metaObjType") + private short metaObjType; + // name of Database + @SerializedName(value = "dbName") + private String dbName; + // name of Table + @SerializedName(value = "tblName") + private String tblName; + // name of Partition + @SerializedName(value = "pName") + private String partitionName; + // id of Database/Table/Partition + @SerializedName(value = "id") + private long id; + + public MetaIdMapping() {} + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + String partitionName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = partitionName; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + String partitionName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = partitionName; + this.id = -1L; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = null; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = null; + this.id = -1L; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = null; + this.partitionName = null; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = null; + this.partitionName = null; + this.id = -1L; + } + + @Override + public int hashCode() { + return Objects.hash(opType, metaObjType, dbName, tblName, partitionName, id); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MetaIdMapping)) { + return false; + } + return Objects.equals(this.opType, ((MetaIdMapping) obj).opType) + && Objects.equals(this.metaObjType, ((MetaIdMapping) obj).metaObjType) + && Objects.equals(this.dbName, ((MetaIdMapping) obj).dbName) + && Objects.equals(this.tblName, ((MetaIdMapping) obj).tblName) + && Objects.equals(this.partitionName, ((MetaIdMapping) obj).partitionName) + && Objects.equals(this.id, ((MetaIdMapping) obj).id); + } + + } + + public enum OperationType { + IGNORE(OPERATION_TYPE_IGNORE), + // Add a Database/Table/Partition + ADD(OPERATION_TYPE_ADD), + // Delete Database/Table/Partition + DELETE(OPERATION_TYPE_DELETE); + + private final short opType; + + OperationType(short opType) { + this.opType = opType; + } + + public short getOperationType() { + return opType; + } + } + + public enum MetaObjectType { + IGNORE(META_OBJECT_TYPE_IGNORE), + DATABASE(META_OBJECT_TYPE_DATABASE), + TABLE(META_OBJECT_TYPE_TABLE), + PARTITION(META_OBJECT_TYPE_PARTITION); + + private final short metaObjType; + + MetaObjectType(short metaObjType) { + this.metaObjType = metaObjType; + } + + public short getMetaObjectType() { + return metaObjType; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index e1dacbd0b2..ffc7b95ff5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.common.FileUtils; @@ -109,4 +112,17 @@ public class AddPartitionEvent extends MetastorePartitionEvent { debugString("Failed to process event"), e); } } + + @Override + protected List transferToMetaIdMappings() { + List metaIdMappings = Lists.newArrayList(); + for (String partitionName : this.getAllPartitionNames()) { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + dbName, tblName, partitionName, ExternalMetaIdMgr.nextMetaId()); + metaIdMappings.add(metaIdMapping); + } + return ImmutableList.copyOf(metaIdMappings); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index 6de71fbbc5..c69812a22b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -100,7 +100,8 @@ public class AlterTableEvent extends MetastoreTableEvent { Env.getCurrentEnv().getCatalogMgr() .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true); + .createExternalTableFromEvent( + tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } private void processRename() throws DdlException { @@ -118,7 +119,8 @@ public class AlterTableEvent extends MetastoreTableEvent { Env.getCurrentEnv().getCatalogMgr() .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true); + .createExternalTableFromEvent( + tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index 42d813319c..d79d23824a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -59,4 +62,13 @@ public class CreateDatabaseEvent extends MetastoreEvent { debugString("Failed to process event"), e); } } + + @Override + protected List transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName, ExternalMetaIdMgr.nextMetaId()); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index 4c3615fbda..246ce8626f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -19,8 +19,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Table; @@ -77,10 +80,19 @@ public class CreateTableEvent extends MetastoreTableEvent { try { infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, true); + .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); } } + + @Override + protected List transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + dbName, tblName, ExternalMetaIdMgr.nextMetaId()); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java index 3481f832fe..ca69e6f14d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -59,4 +61,13 @@ public class DropDatabaseEvent extends MetastoreEvent { debugString("Failed to process event"), e); } } + + @Override + protected List transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index f71f44cf5a..dd44301028 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -124,7 +126,7 @@ public class DropPartitionEvent extends MetastorePartitionEvent { return false; } - // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has + // `that` event can be batched if this event's partitions contains all the partitions which `that` event has // else just remove `that` event's relevant partitions for (String partitionName : getAllPartitionNames()) { if (thatPartitionEvent instanceof AddPartitionEvent) { @@ -136,4 +138,17 @@ public class DropPartitionEvent extends MetastorePartitionEvent { return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames()); } + + @Override + protected List transferToMetaIdMappings() { + List metaIdMappings = Lists.newArrayList(); + for (String partitionName : this.getAllPartitionNames()) { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName, tblName, partitionName); + metaIdMappings.add(metaIdMapping); + } + return ImmutableList.copyOf(metaIdMappings); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index c333506cad..0f62e24608 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage; @@ -89,14 +91,23 @@ public class DropTableEvent extends MetastoreTableEvent { return false; } - /** + /* * Check if `that` event is a rename event, a rename event can not be batched * because the process of `that` event will change the reference relation of this table, * otherwise it can be batched because this event is a drop-table event * and the process of this event will drop the whole table, * and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false - * */ + */ MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that; return !thatTblEvent.willChangeTableName(); } + + @Override + protected List transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + dbName, tblName); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index f9771562ed..a9d165b4d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.datasource.hive.HMSCachedClient; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -227,6 +229,13 @@ public abstract class MetastoreEvent { return name.toString(); } + /** + * Create a MetaIdMapping list from the event if the event is a create/add/drop event + */ + protected List transferToMetaIdMappings() { + return ImmutableList.of(); + } + @Override public String toString() { return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index aabc562dba..a3ba092703 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -18,7 +18,9 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.catalog.Env; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -77,23 +79,39 @@ public class MetastoreEventFactory implements EventFactory { for (NotificationEvent event : events) { metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName)); } - return createBatchEvents(catalogName, metastoreEvents); + List mergedEvents = mergeEvents(catalogName, metastoreEvents); + if (Env.getCurrentEnv().isMaster()) { + logMetaIdMappings(hmsExternalCatalog.getId(), events.get(events.size() - 1).getEventId(), mergedEvents); + } + return mergedEvents; + } + + private void logMetaIdMappings(long catalogId, long lastSyncedEventId, List mergedEvents) { + MetaIdMappingsLog log = new MetaIdMappingsLog(); + log.setCatalogId(catalogId); + log.setFromHmsEvent(true); + log.setLastSyncedEventId(lastSyncedEventId); + for (MetastoreEvent event : mergedEvents) { + log.addMetaIdMappings(event.transferToMetaIdMappings()); + } + Env.getCurrentEnv().getExternalMetaIdMgr().replayMetaIdMappingsLog(log); + Env.getCurrentEnv().getEditLog().logMetaIdMappingsLog(log); } /** * Merge events to reduce the cost time on event processing, currently mainly handles MetastoreTableEvent * because merge MetastoreTableEvent is simple and cost-effective. * For example, consider there are some events as following: - * + *
      *    event1: alter table db1.t1 add partition p1;
      *    event2: alter table db1.t1 drop partition p2;
      *    event3: alter table db1.t2 add partition p3;
      *    event4: alter table db2.t3 rename to t4;
      *    event5: drop table db1.t1;
-     *
+     * 
* Only `event3 event4 event5` will be reserved and other events will be skipped. * */ - public List createBatchEvents(String catalogName, List events) { + public List mergeEvents(String catalogName, List events) { List eventsCopy = Lists.newArrayList(events); Map> indexMap = Maps.newLinkedHashMap(); for (int i = 0; i < events.size(); i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 622d84428f..28793aecf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -18,13 +18,22 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; +import org.apache.doris.qe.OriginStatement; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; @@ -35,6 +44,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; +import java.util.Map; /** * A metastore event is a instance of the class @@ -68,6 +78,13 @@ public class MetastoreEventsProcessor extends MasterDaemon { // event factory which is used to get or create MetastoreEvents private final MetastoreEventFactory metastoreEventFactory; + // manager the lastSyncedEventId of hms catalogs + // use HashMap is fine because all operations are in one thread + private final Map lastSyncedEventIdMap = Maps.newHashMap(); + + // manager the masterLastSyncedEventId of hms catalogs + private final Map masterLastSyncedEventIdMap = Maps.newHashMap(); + private boolean isRunning; public MetastoreEventsProcessor() { @@ -76,49 +93,6 @@ public class MetastoreEventsProcessor extends MasterDaemon { this.isRunning = false; } - /** - * Fetch the next batch of NotificationEvents from metastore. The default batch size is - * {@link Config#hms_events_batch_size_per_rpc} - */ - private List getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) { - LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName()); - NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog); - - if (response == null) { - return Collections.emptyList(); - } - return response.getEvents(); - } - - private void doExecute(List events, HMSExternalCatalog hmsExternalCatalog) { - for (MetastoreEvent event : events) { - try { - event.process(); - } catch (HMSClientException hmsClientException) { - if (hmsClientException.getCause() != null - && hmsClientException.getCause() instanceof NoSuchObjectException) { - LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException); - } else { - hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); - throw hmsClientException; - } - } catch (Exception e) { - hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); - throw e; - } - } - } - - /** - * Process the given list of notification events. Useful for tests which provide a list of events - */ - private void processEvents(List events, HMSExternalCatalog hmsExternalCatalog) { - //transfer - List metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog); - doExecute(metastoreEvents, hmsExternalCatalog); - hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId()); - } - @Override protected void runAfterCatalogReady() { if (isRunning) { @@ -157,6 +131,189 @@ public class MetastoreEventsProcessor extends MasterDaemon { } } + /** + * Fetch the next batch of NotificationEvents from metastore. The default batch size is + * {@link Config#hms_events_batch_size_per_rpc} + */ + private List getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) throws Exception { + LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName()); + NotificationEventResponse response; + if (Env.getCurrentEnv().isMaster()) { + response = getNextEventResponseForMaster(hmsExternalCatalog); + } else { + response = getNextEventResponseForSlave(hmsExternalCatalog); + } + + if (response == null) { + return Collections.emptyList(); + } + return response.getEvents(); + } + + private void doExecute(List events, HMSExternalCatalog hmsExternalCatalog) { + for (MetastoreEvent event : events) { + try { + event.process(); + } catch (HMSClientException hmsClientException) { + if (hmsClientException.getCause() != null + && hmsClientException.getCause() instanceof NoSuchObjectException) { + LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException); + } else { + updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1); + throw hmsClientException; + } + } catch (Exception e) { + updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1); + throw e; + } + } + } + + /** + * Process the given list of notification events. Useful for tests which provide a list of events + */ + private void processEvents(List events, HMSExternalCatalog hmsExternalCatalog) { + //transfer + List metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog); + doExecute(metastoreEvents, hmsExternalCatalog); + updateLastSyncedEventId(hmsExternalCatalog, events.get(events.size() - 1).getEventId()); + } + + private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog) + throws MetastoreNotificationFetchException { + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); + long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); + if (lastSyncedEventId < 0) { + refreshCatalogForMaster(hmsExternalCatalog); + // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events + // but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); + LOG.info( + "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + + "lastSyncedEventId is [{}]", + hmsExternalCatalog.getName(), lastSyncedEventId); + return null; + } + + LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {}, lastSyncedEventId is {}", + hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); + if (currentEventId == lastSyncedEventId) { + LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + + try { + return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, + Config.hms_events_batch_size_per_rpc, null); + } catch (MetastoreNotificationFetchException e) { + // Need a fallback to handle this because this error state can not be recovered until restarting FE + if (StringUtils.isNotEmpty(e.getMessage()) + && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { + refreshCatalogForMaster(hmsExternalCatalog); + // set lastSyncedEventId to currentEventId after refresh catalog successfully + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); + LOG.warn("Notification events are missing, maybe an event can not be handled " + + "or processing rate is too low, fallback to refresh the catalog"); + return null; + } + throw e; + } + } + + private NotificationEventResponse getNextEventResponseForSlave(HMSExternalCatalog hmsExternalCatalog) + throws Exception { + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); + long masterLastSyncedEventId = getMasterLastSyncedEventId(hmsExternalCatalog); + // do nothing if masterLastSyncedEventId has not been synced + if (masterLastSyncedEventId == -1L) { + LOG.info("LastSyncedEventId of master has not been synced on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + // do nothing if lastSyncedEventId is equals to masterLastSyncedEventId + if (lastSyncedEventId == masterLastSyncedEventId) { + LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + + if (lastSyncedEventId < 0) { + refreshCatalogForSlave(hmsExternalCatalog); + // Use masterLastSyncedEventId to avoid missing events + updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId); + LOG.info( + "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + + "lastSyncedEventId is [{}]", + hmsExternalCatalog.getName(), lastSyncedEventId); + return null; + } + + LOG.debug("Catalog [{}] getNextEventResponse, masterLastSyncedEventId is {}, lastSyncedEventId is {}", + hmsExternalCatalog.getName(), masterLastSyncedEventId, lastSyncedEventId); + + // For slave FE nodes, only fetch events which id is lower than masterLastSyncedEventId + int maxEventSize = Math.min((int) (masterLastSyncedEventId - lastSyncedEventId), + Config.hms_events_batch_size_per_rpc); + try { + return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, maxEventSize, null); + } catch (MetastoreNotificationFetchException e) { + // Need a fallback to handle this because this error state can not be recovered until restarting FE + if (StringUtils.isNotEmpty(e.getMessage()) + && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { + refreshCatalogForMaster(hmsExternalCatalog); + // set masterLastSyncedEventId to lastSyncedEventId after refresh catalog successfully + updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId); + LOG.warn("Notification events are missing, maybe an event can not be handled " + + "or processing rate is too low, fallback to refresh the catalog"); + return null; + } + throw e; + } + } + + private long getCurrentHmsEventId(HMSExternalCatalog hmsExternalCatalog) { + CurrentNotificationEventId currentNotificationEventId = hmsExternalCatalog.getClient() + .getCurrentNotificationEventId(); + if (currentNotificationEventId == null) { + LOG.warn("Get currentNotificationEventId is null"); + return -1L; + } + return currentNotificationEventId.getEventId(); + } + + private long getLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) { + // Returns to -1 if not exists, otherwise client.getNextNotification will throw exception + // Reference to https://github.com/apDdlache/doris/issues/18251 + return lastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L); + } + + private void updateLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) { + lastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId); + } + + private long getMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) { + return masterLastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L); + } + + public void updateMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) { + masterLastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId); + } + + private void refreshCatalogForMaster(HMSExternalCatalog hmsExternalCatalog) { + CatalogLog log = new CatalogLog(); + log.setCatalogId(hmsExternalCatalog.getId()); + log.setInvalidCache(true); + Env.getCurrentEnv().getCatalogMgr().replayRefreshCatalog(log); + } + + private void refreshCatalogForSlave(HMSExternalCatalog hmsExternalCatalog) throws Exception { + // Transfer to master to refresh catalog + String sql = "REFRESH CATALOG " + hmsExternalCatalog.getName(); + OriginStatement originStmt = new OriginStatement(sql, 0); + MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, new ConnectContext(), + RedirectStatus.FORWARD_WITH_SYNC, false); + LOG.debug("Transfer to master to refresh catalog, stmt: {}", sql); + masterOpExecutor.execute(); + } + public static MessageDeserializer getMessageDeserializer(String messageFormat) { if (messageFormat != null && messageFormat.startsWith(GZIP_JSON_FORMAT_PREFIX)) { return GZIP_JSON_MESSAGE_DESERIALIZER; diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 1bedda0a7e..d74c519407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -42,7 +42,7 @@ import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.doris.datasource.InitTableLog; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; @@ -746,19 +746,18 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_INIT_EXTERNAL_TABLE: { - data = InitTableLog.read(in); - isRead = true; - break; - } - case OperationType.OP_REFRESH_EXTERNAL_DB: + case OperationType.OP_INIT_EXTERNAL_TABLE: case OperationType.OP_DROP_EXTERNAL_TABLE: case OperationType.OP_CREATE_EXTERNAL_TABLE: case OperationType.OP_DROP_EXTERNAL_DB: case OperationType.OP_CREATE_EXTERNAL_DB: case OperationType.OP_ADD_EXTERNAL_PARTITIONS: case OperationType.OP_DROP_EXTERNAL_PARTITIONS: - case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: + case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: { + isRead = true; + break; + } + case OperationType.OP_REFRESH_EXTERNAL_DB: case OperationType.OP_REFRESH_EXTERNAL_TABLE: { data = ExternalObjectLog.read(in); isRead = true; @@ -906,6 +905,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_ADD_META_ID_MAPPINGS: { + data = MetaIdMappingsLog.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 8c2424d483..958277dd6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -51,6 +51,7 @@ import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; @@ -1011,38 +1012,24 @@ public class EditLog { break; } case OperationType.OP_DROP_EXTERNAL_TABLE: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalTable(log); break; } case OperationType.OP_CREATE_EXTERNAL_TABLE: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayCreateExternalTableFromEvent(log); break; } case OperationType.OP_DROP_EXTERNAL_DB: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalDatabase(log); break; } case OperationType.OP_CREATE_EXTERNAL_DB: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayCreateExternalDatabase(log); break; } case OperationType.OP_ADD_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayAddExternalPartitions(log); break; } case OperationType.OP_DROP_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalPartitions(log); break; } case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayRefreshExternalPartitions(log); break; } case OperationType.OP_CREATE_WORKLOAD_GROUP: { @@ -1165,6 +1152,10 @@ public class EditLog { env.getBackupHandler().getRepoMgr().alterRepo(repository, true); break; } + case OperationType.OP_ADD_META_ID_MAPPINGS: { + env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) journal.getData()); + break; + } case OperationType.OP_LOG_UPDATE_ROWS: case OperationType.OP_LOG_NEW_PARTITION_LOADED: case OperationType.OP_LOG_ALTER_COLUMN_STATS: { @@ -1886,26 +1877,32 @@ public class EditLog { logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log); } + @Deprecated public void logDropExternalTable(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log); } + @Deprecated public void logCreateExternalTable(ExternalObjectLog log) { logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log); } + @Deprecated public void logDropExternalDatabase(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_DB, log); } + @Deprecated public void logCreateExternalDatabase(ExternalObjectLog log) { logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log); } + @Deprecated public void logAddExternalPartitions(ExternalObjectLog log) { logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log); } + @Deprecated public void logDropExternalPartitions(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log); } @@ -2005,6 +2002,10 @@ public class EditLog { logEdit(OperationType.OP_INSERT_OVERWRITE, log); } + public void logMetaIdMappingsLog(MetaIdMappingsLog log) { + logEdit(OperationType.OP_ADD_META_ID_MAPPINGS, log); + } + public String getNotReadyReason() { if (journal == null) { return "journal is null"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 0945dc0f15..0868d7f371 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -288,12 +288,19 @@ public class OperationType { public static final short OP_ADD_CONSTRAINT = 346; public static final short OP_DROP_CONSTRAINT = 347; + @Deprecated public static final short OP_DROP_EXTERNAL_TABLE = 350; + @Deprecated public static final short OP_DROP_EXTERNAL_DB = 351; + @Deprecated public static final short OP_CREATE_EXTERNAL_TABLE = 352; + @Deprecated public static final short OP_CREATE_EXTERNAL_DB = 353; + @Deprecated public static final short OP_ADD_EXTERNAL_PARTITIONS = 354; + @Deprecated public static final short OP_DROP_EXTERNAL_PARTITIONS = 355; + @Deprecated public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356; public static final short OP_ALTER_USER = 400; @@ -364,6 +371,8 @@ public class OperationType { public static final short OP_LOG_ALTER_COLUMN_STATS = 464; + public static final short OP_ADD_META_ID_MAPPINGS = 470; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java new file mode 100644 index 0000000000..12e018a4cf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ExternalMetaIdMgrTest { + + @Test + public void testReplayMetaIdMappingsLog() { + ExternalMetaIdMgr mgr = new ExternalMetaIdMgr(); + MetaIdMappingsLog log1 = new MetaIdMappingsLog(); + log1.setCatalogId(1L); + log1.setFromHmsEvent(false); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log1); + Assertions.assertNotEquals(-1L, mgr.getDbId(1L, "db1")); + + MetaIdMappingsLog log2 = new MetaIdMappingsLog(); + log2.setCatalogId(1L); + log2.setFromHmsEvent(false); + log2.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1")); + mgr.replayMetaIdMappingsLog(log2); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + + MetaIdMappingsLog log3 = new MetaIdMappingsLog(); + log3.setCatalogId(1L); + log3.setFromHmsEvent(false); + log3.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log3); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + Assertions.assertNotEquals(-1L, mgr.getTblId(1L, "db1", "tbl1")); + + MetaIdMappingsLog log4 = new MetaIdMappingsLog(); + log4.setCatalogId(1L); + log4.setFromHmsEvent(false); + log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1")); + log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log4); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + Assertions.assertEquals(-1L, mgr.getTblId(1L, "db1", "tbl1")); + Assertions.assertNotEquals(-1L, mgr.getPartitionId(1L, "db1", "tbl1", "p1")); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java new file mode 100644 index 0000000000..fec57c29ed --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class MetaIdMappingsLogTest { + + @Test + public void testSerialization() throws Exception { + // 1. Write objects to file + MetaIdMappingsLog log1 = new MetaIdMappingsLog(); + Path path = Files.createFile(Paths.get("./metaIdMappingsLogTest.txt")); + try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path))) { + log1.setFromHmsEvent(true); + log1.setLastSyncedEventId(-1L); + log1.setCatalogId(1L); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl2", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db2")); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db2")); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId())); + log1.write(dos); + dos.flush(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + Files.deleteIfExists(path); + Assertions.fail(); + } + + // 2. Read objects from file + MetaIdMappingsLog log2; + try (DataInputStream dis = new DataInputStream(Files.newInputStream(path))) { + log2 = MetaIdMappingsLog.read(dis); + Assertions.assertEquals(log1, log2); + } catch (Throwable throwable) { + throwable.printStackTrace(); + Assertions.fail(); + } finally { + Files.deleteIfExists(path); + } + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java index c9e566dc9d..136cac6b71 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java @@ -454,7 +454,7 @@ public class MetastoreEventFactoryTest { for (int j = 0; j < 1000; j++) { events.add(producer.produceOneEvent(j)); } - List mergedEvents = factory.createBatchEvents(testCtl, events); + List mergedEvents = factory.mergeEvents(testCtl, events); for (MetastoreEvent event : events) { processEvent(validateCatalog, event);