[enhance-wip](multi-catalog) Speed up consume rate of hms events. (#27666)

## Proposed changes

The current implement will persist all catalogs/databases of external catalogs, and only the master FE can handle hms events and make all slave nodes replay these events, this will bring some problems:

- The hms event processor ( `MetastoreEventsProcessor` ) can not consume the events in time. (Add journal log is a synchronized method, we can not speed up the consume rate by using concurrent processing, and each add-journal-log operation costs about tens of milliseconds) So the meta info of hive maybe out of date.

- Slave FE nodes maybe crashed if FE replays the journal logs of hms events failed. (In fact we have fixed some issues about this, but we can not make sure all the issues have been resolved)

- There are many journal logs which are produced by hms events, but in fact these logs are not used anymore after FE restart. It makes the start time of all FE nodes very long.

Now doris try to persis all databases/tables of external catalogs just to make sure that the dbId/tableId of databases/tables are the same through all FE nodes, it will be used by analysis jobs. 

In this pr, we use a meta id manager called `ExternalMetaIdMgr` to manage these meta ids. On every loop when master fetches a batch of hms events, it handles the meta ids first and produce only one meta id mappings log, slave FE nodes will replay this log to sync the changes about these meta ids. `MetastoreEventsProcessor` will start on every FE nodes and try to consume these hms events as soon as possible.

## Further comments

I've submitted two prs ( #22869 #21589 ) to speed up the consume rate of hms events before, it works fine when there are many `AlterTableEvent` / `DropTableEvent` on hive cluster. But the improvement is not that significant when most of hms events are partition-events.  Unfortunately, we performed a cluster upgrade (upgrade spark 2.x to spark 3.x), maybe this is the reason that resulting in the majority of Hive Metastore events became partition-events. This is also the reason for the existence of this pull request.

Based on our observation, after merging this pull request, Doris is now capable of processing thousands of Hive Metastore events per second, compared to the previous capability of handling only a few dozen events.

```java
2023-12-07 05:17:03,518 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287902, replay to journal id is 18287903
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [1947], after merge is [1849]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955309 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-05-27],partitionNameAfter:[partitions=2022-05-27]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955310 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230318],partitionNameAfter:[pday=20230318]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955311 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20190826],partitionNameAfter:[pday=20190826]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955312 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-09-16],partitionNameAfter:[partitions=2021-09-16]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955314 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2020-04-26],partitionNameAfter:[partitions=2020-04-26]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955315 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230702],partitionNameAfter:[pday=20230702]
2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955317 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20211019],partitionNameAfter:[pday=20211019]
...
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957252 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-08-27],partitionNameAfter:[partitions=2021-08-27]
2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957253 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-02-05],partitionNameAfter:[partitions=2022-02-05]
2023-12-07 05:17:04,661 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287903, replay to journal id is 18287904
2023-12-07 05:17:05,028 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventsProcessor.realRun():116] Events size are 587 on catalog [xxx]
2023-12-07 05:17:05,662 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [587], after merge is [587]
```
This commit is contained in:
Xiangyu Wang
2024-01-19 15:24:12 +08:00
committed by yiguolei
parent f66f6b2a82
commit 8a75da0fec
27 changed files with 1147 additions and 495 deletions

View File

@ -131,6 +131,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
@ -362,6 +363,7 @@ public class Env {
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
private CooldownConfHandler cooldownConfHandler;
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;
private ExportTaskRegister exportTaskRegister;
@ -649,6 +651,7 @@ public class Env {
if (Config.enable_storage_policy) {
this.cooldownConfHandler = new CooldownConfHandler();
}
this.externalMetaIdMgr = new ExternalMetaIdMgr();
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
@ -844,6 +847,14 @@ public class Env {
return workloadRuntimeStatusMgr;
}
public ExternalMetaIdMgr getExternalMetaIdMgr() {
return externalMetaIdMgr;
}
public MetastoreEventsProcessor getMetastoreEventsProcessor() {
return metastoreEventsProcessor;
}
// use this to get correct ClusterInfoService instance
public static SystemInfoService getCurrentSystemInfo() {
return getCurrentEnv().getClusterInfo();
@ -1638,9 +1649,6 @@ public class Env {
streamLoadRecordMgr.start();
tabletLoadIndexRecorderMgr.start();
new InternalSchemaInitializer().start();
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}
getRefreshManager().start();
// binlog gcer
@ -1662,6 +1670,9 @@ public class Env {
domainResolver.start();
// fe disk updater
feDiskUpdater.start();
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}
}
private void transferToNonMaster(FrontendNodeType newType) {

View File

@ -146,8 +146,15 @@ public abstract class ExternalDatabase<T extends ExternalTable>
Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
T table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
// When upgrade cluster with this pr: https://github.com/apache/doris/pull/27666
// Maybe there are some create table events will be skipped
// if the cluster has any hms catalog(s) with hms event listener enabled.
// So we need add a validation here to avoid table(s) not found, this is just a temporary solution
// because later we will remove all the logics about InitCatalogLog/InitDatabaseLog.
if (table != null) {
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
}
for (int i = 0; i < log.getCreateCount(); i++) {
T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog);
@ -195,8 +202,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
idToTbl = tmpIdToTbl;
}
long currentTime = System.currentTimeMillis();
lastUpdateTime = currentTime;
lastUpdateTime = System.currentTimeMillis();
initDatabaseLog.setLastUpdateTime(lastUpdateTime);
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
@ -370,17 +376,13 @@ public abstract class ExternalDatabase<T extends ExternalTable>
throw new NotImplementedException("dropTable() is not implemented");
}
public void dropTableForReplay(String tableName) {
throw new NotImplementedException("replayDropTableFromEvent() is not implemented");
}
@Override
public CatalogIf getCatalog() {
return extCatalog;
}
// Only used for sync hive metastore event
public void createTableForReplay(String tableName, long tableId) {
public void createTable(String tableName, long tableId) {
throw new NotImplementedException("createTable() is not implemented");
}

View File

@ -64,17 +64,6 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
makeSureInitialized();
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
LOG.warn("drop table [{}] failed", tableName);
}
idToTbl.remove(tableId);
}
@Override
public void dropTableForReplay(String tableName) {
LOG.debug("replayDropTableFromEvent [{}]", tableName);
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
@ -85,7 +74,7 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
}
@Override
public void createTableForReplay(String tableName, long tableId) {
public void createTable(String tableName, long tableId) {
LOG.debug("create table [{}]", tableName);
tableNameToId.put(tableName, tableId);
HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog);

View File

@ -49,7 +49,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
}
@Override
public void dropTableForReplay(String tableName) {
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
@ -59,7 +59,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
}
@Override
public void createTableForReplay(String tableName, long tableId) {
public void createTable(String tableName, long tableId) {
LOG.debug("create table [{}]", tableName);
tableNameToId.put(tableName, tableId);
IcebergExternalTable table = new IcebergExternalTable(tableId, tableName, name,

View File

@ -49,7 +49,7 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable
}
@Override
public void dropTableForReplay(String tableName) {
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
@ -59,7 +59,7 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable
}
@Override
public void createTableForReplay(String tableName, long tableId) {
public void createTable(String tableName, long tableId) {
LOG.debug("create table [{}]", tableName);
tableNameToId.put(tableName, tableId);
PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name,

View File

@ -32,6 +32,7 @@ import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@ -344,10 +345,10 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
writeLock();
try {
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
Map<String, String> oldProperties = catalog.getProperties();
if (catalog == null) {
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
}
Map<String, String> oldProperties = catalog.getProperties();
if (stmt.getNewProperties().containsKey("type") && !catalog.getType()
.equalsIgnoreCase(stmt.getNewProperties().get("type"))) {
throw new DdlException("Can't modify the type of catalog property with name: " + stmt.getCatalogName());
@ -681,12 +682,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
((HMSExternalTable) table).unsetObjectCreated();
((HMSExternalTable) table).setEventUpdateTime(updateTime);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setLastUpdateTime(updateTime);
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists)
@ -772,43 +767,16 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
}
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setLastUpdateTime(System.currentTimeMillis());
replayDropExternalTable(log);
Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
}
public void replayDropExternalTable(ExternalObjectLog log) {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
db.writeLock();
try {
db.dropTableForReplay(table.getName());
db.setLastUpdateTime(log.getLastUpdateTime());
db.dropTable(table.getName());
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(
catalog.getId(), db.getFullName(), table.getName());
((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis());
} finally {
db.writeUnlock();
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}
public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException {
@ -822,9 +790,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName);
}
public void createExternalTableFromEvent(String dbName, String tableName, String catalogName,
boolean ignoreIfExists)
throws DdlException {
public void createExternalTableFromEvent(String dbName, String tableName,
String catalogName, long updateTime,
boolean ignoreIfExists) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
@ -847,33 +815,21 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
}
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableName(tableName);
log.setTableId(Env.getCurrentEnv().getNextId());
log.setLastUpdateTime(System.currentTimeMillis());
replayCreateExternalTableFromEvent(log);
Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
}
public void replayCreateExternalTableFromEvent(ExternalObjectLog log) {
LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId(), log.getTableName());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
long tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName);
// -1L means it will be dropped later, ignore
if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
return;
}
db.writeLock();
try {
db.createTableForReplay(log.getTableName(), log.getTableId());
db.setLastUpdateTime(log.getLastUpdateTime());
((HMSExternalDatabase) db).createTable(tableName, tblId);
((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis());
table = db.getTableNullable(tableName);
if (table != null) {
((HMSExternalTable) table).setEventUpdateTime(updateTime);
}
} finally {
db.writeUnlock();
}
@ -895,34 +851,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setInvalidCache(true);
replayDropExternalDatabase(log);
Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log);
}
public void replayDropExternalDatabase(ExternalObjectLog log) {
writeLock();
try {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
catalog.dropDatabaseForReplay(db.getFullName());
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), db.getFullName());
} finally {
writeUnlock();
}
((HMSExternalCatalog) catalog).dropDatabase(dbName);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), dbName);
}
public void createExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) throws DdlException {
@ -941,28 +871,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(Env.getCurrentEnv().getNextId());
log.setDbName(dbName);
replayCreateExternalDatabase(log);
Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log);
}
public void replayCreateExternalDatabase(ExternalObjectLog log) {
writeLock();
try {
LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", log.getCatalogId(),
log.getDbId(), log.getDbName());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
catalog.createDatabaseForReplay(log.getDbId(), log.getDbName());
} finally {
writeUnlock();
long dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName);
// -1L means it will be dropped later, ignore
if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
return;
}
((HMSExternalCatalog) catalog).createDatabase(dbId, dbName);
}
public void addExternalPartitions(String catalogName, String dbName, String tableName,
@ -998,48 +913,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
hmsTable.setEventUpdateTime(updateTime);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(updateTime);
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
}
public void replayAddExternalPartitions(ExternalObjectLog log) {
LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
HMSExternalTable hmsTable = (HMSExternalTable) table;
try {
Env.getCurrentEnv().getExtMetaCacheMgr()
.addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
} catch (HMSClientException e) {
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
db.getFullName(), table.getName());
}
}
public void dropExternalPartitions(String catalogName, String dbName, String tableName,
@ -1068,42 +941,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(updateTime);
replayDropExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
}
public void replayDropExternalPartitions(ExternalObjectLog log) {
LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
HMSExternalTable hmsTable = (HMSExternalTable) table;
Env.getCurrentEnv().getExtMetaCacheMgr()
.dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames);
hmsTable.setEventUpdateTime(updateTime);
}
public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
@ -1135,42 +975,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return;
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
log.setLastUpdateTime(updateTime);
replayRefreshExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
}
public void replayRefreshExternalPartitions(ExternalObjectLog log) {
LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
log.getPartitionNames());
((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache(
catalog.getId(), db.getFullName(), table.getName(), partitionNames);
((HMSExternalTable) table).setEventUpdateTime(updateTime);
}
public void registerCatalogRefreshListener(Env env) {

View File

@ -521,13 +521,6 @@ public abstract class ExternalCatalog
return null;
}
/**
* External catalog has no cluster semantics.
*/
protected static String getRealTableName(String tableName) {
return ClusterNamespace.getNameFromFullName(tableName);
}
public static ExternalCatalog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ExternalCatalog.class);
@ -546,9 +539,6 @@ public abstract class ExternalCatalog
db.setTableExtCatalog(this);
}
objectCreated = false;
if (this instanceof HMSExternalCatalog) {
((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
}
// TODO: This code is to compatible with older version of metadata.
// Could only remove after all users upgrate to the new version.
if (logType == null) {
@ -569,11 +559,11 @@ public abstract class ExternalCatalog
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
public void dropDatabaseForReplay(String dbName) {
public void dropDatabase(String dbName) {
throw new NotImplementedException("dropDatabase not implemented");
}
public void createDatabaseForReplay(long dbId, String dbName) {
public void createDatabase(long dbId, String dbName) {
throw new NotImplementedException("createDatabase not implemented");
}

View File

@ -0,0 +1,263 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
/**
* <pre>
* ExternalMetaIdMgr is responsible for managing external meta ids.
* Now it just manages the external meta ids of hms events,
* but it will be extended to manage other external meta ids in the future.
* </pre>
* TODO: remove InitCatalogLog and InitDatabaseLog, manage external meta ids at ExternalMetaIdMgr
*/
public class ExternalMetaIdMgr {
private static final Logger LOG = LogManager.getLogger(ExternalMetaIdMgr.class);
public static final long META_ID_FOR_NOT_EXISTS = -1L;
private final Map<Long, CtlMetaIdMgr> idToCtlMgr = Maps.newConcurrentMap();
public ExternalMetaIdMgr() {
}
// invoke this method only on master
public static long nextMetaId() {
return Env.getCurrentEnv().getNextId();
}
// return the db id of the specified db, -1 means not exists
public long getDbId(long catalogId, String dbName) {
DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName);
if (dbMetaIdMgr == null) {
return META_ID_FOR_NOT_EXISTS;
}
return dbMetaIdMgr.dbId;
}
// return the tbl id of the specified tbl, -1 means not exists
public long getTblId(long catalogId, String dbName, String tblName) {
TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName);
if (tblMetaIdMgr == null) {
return META_ID_FOR_NOT_EXISTS;
}
return tblMetaIdMgr.tblId;
}
// return the partition id of the specified partition, -1 means not exists
public long getPartitionId(long catalogId, String dbName,
String tblName, String partitionName) {
PartitionMetaIdMgr partitionMetaIdMgr = getPartitionMetaIdMgr(catalogId, dbName, tblName, partitionName);
if (partitionMetaIdMgr == null) {
return META_ID_FOR_NOT_EXISTS;
}
return partitionMetaIdMgr.partitionId;
}
private @Nullable DbMetaIdMgr getDbMetaIdMgr(long catalogId, String dbName) {
CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.get(catalogId);
if (ctlMetaIdMgr == null) {
return null;
}
return ctlMetaIdMgr.dbNameToMgr.get(dbName);
}
private @Nullable TblMetaIdMgr getTblMetaIdMgr(long catalogId, String dbName, String tblName) {
DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName);
if (dbMetaIdMgr == null) {
return null;
}
return dbMetaIdMgr.tblNameToMgr.get(tblName);
}
private PartitionMetaIdMgr getPartitionMetaIdMgr(long catalogId, String dbName,
String tblName, String partitionName) {
TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName);
if (tblMetaIdMgr == null) {
return null;
}
return tblMetaIdMgr.partitionNameToMgr.get(partitionName);
}
public void replayMetaIdMappingsLog(@NotNull MetaIdMappingsLog log) {
Preconditions.checkNotNull(log);
long catalogId = log.getCatalogId();
CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, CtlMetaIdMgr::new);
for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) {
handleMetaIdMapping(mapping, ctlMetaIdMgr);
}
if (log.isFromHmsEvent()) {
CatalogIf<?> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId());
if (catalogIf != null) {
MetastoreEventsProcessor metastoreEventsProcessor = Env.getCurrentEnv().getMetastoreEventsProcessor();
metastoreEventsProcessor.updateMasterLastSyncedEventId(
(HMSExternalCatalog) catalogIf, log.getLastSyncedEventId());
}
}
}
// no lock because the operations is serialized currently
private void handleMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, CtlMetaIdMgr ctlMetaIdMgr) {
MetaIdMappingsLog.OperationType opType = MetaIdMappingsLog.getOperationType(mapping.getOpType());
MetaIdMappingsLog.MetaObjectType objType = MetaIdMappingsLog.getMetaObjectType(mapping.getMetaObjType());
switch (opType) {
case ADD:
handleAddMetaIdMapping(mapping, ctlMetaIdMgr, objType);
break;
case DELETE:
handleDelMetaIdMapping(mapping, ctlMetaIdMgr, objType);
break;
default:
break;
}
}
private static void handleDelMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping,
CtlMetaIdMgr ctlMetaIdMgr,
MetaIdMappingsLog.MetaObjectType objType) {
TblMetaIdMgr tblMetaIdMgr;
DbMetaIdMgr dbMetaIdMgr;
switch (objType) {
case DATABASE:
ctlMetaIdMgr.dbNameToMgr.remove(mapping.getDbName());
break;
case TABLE:
dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName());
if (dbMetaIdMgr != null) {
dbMetaIdMgr.tblNameToMgr.remove(mapping.getTblName());
}
break;
case PARTITION:
dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName());
if (dbMetaIdMgr != null) {
tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr.get(mapping.getTblName());
if (tblMetaIdMgr != null) {
tblMetaIdMgr.partitionNameToMgr.remove(mapping.getPartitionName());
}
}
break;
default:
break;
}
}
private static void handleAddMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping,
CtlMetaIdMgr ctlMetaIdMgr,
MetaIdMappingsLog.MetaObjectType objType) {
DbMetaIdMgr dbMetaIdMgr;
TblMetaIdMgr tblMetaIdMgr;
switch (objType) {
case DATABASE:
ctlMetaIdMgr.dbNameToMgr.put(mapping.getDbName(),
new DbMetaIdMgr(mapping.getId(), mapping.getDbName()));
break;
case TABLE:
dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr
.computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new);
dbMetaIdMgr.tblNameToMgr.put(mapping.getTblName(),
new TblMetaIdMgr(mapping.getId(), mapping.getTblName()));
break;
case PARTITION:
dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr
.computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new);
tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr
.computeIfAbsent(mapping.getTblName(), TblMetaIdMgr::new);
tblMetaIdMgr.partitionNameToMgr.put(mapping.getPartitionName(),
new PartitionMetaIdMgr(mapping.getId(), mapping.getPartitionName()));
break;
default:
break;
}
}
public static class CtlMetaIdMgr {
protected final long catalogId;
protected CtlMetaIdMgr(long catalogId) {
this.catalogId = catalogId;
}
protected Map<String, DbMetaIdMgr> dbNameToMgr = Maps.newConcurrentMap();
}
public static class DbMetaIdMgr {
protected volatile long dbId = META_ID_FOR_NOT_EXISTS;
protected final String dbName;
protected DbMetaIdMgr(long dbId, String dbName) {
this.dbId = dbId;
this.dbName = dbName;
}
protected DbMetaIdMgr(String dbName) {
this.dbName = dbName;
}
protected Map<String, TblMetaIdMgr> tblNameToMgr = Maps.newConcurrentMap();
}
public static class TblMetaIdMgr {
protected volatile long tblId = META_ID_FOR_NOT_EXISTS;
protected final String tblName;
protected TblMetaIdMgr(long tblId, String tblName) {
this.tblId = tblId;
this.tblName = tblName;
}
protected TblMetaIdMgr(String tblName) {
this.tblName = tblName;
}
protected Map<String, PartitionMetaIdMgr> partitionNameToMgr = Maps.newConcurrentMap();
}
public static class PartitionMetaIdMgr {
protected volatile long partitionId = META_ID_FOR_NOT_EXISTS;
protected final String partitionName;
protected PartitionMetaIdMgr(long partitionId, String partitionName) {
this.partitionId = partitionId;
this.partitionName = partitionName;
}
protected PartitionMetaIdMgr(String partitionName) {
this.partitionName = partitionName;
}
}
}

View File

@ -23,23 +23,19 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSCachedClientFactory;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -57,10 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
private static final int MIN_CLIENT_POOL_SIZE = 8;
protected HMSCachedClient client;
// Record the latest synced event id when processing hive events
// Must set to -1 otherwise client.getNextNotification will throw exception
// Reference to https://github.com/apDdlache/doris/issues/18251
private long lastSyncedEventId = -1L;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
// broker name for file split and query scan.
@ -132,18 +125,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
}
public String getHiveMetastoreUris() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
}
public String getHiveVersion() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
}
protected List<String> listDatabaseNames() {
return client.getAllDatabases();
}
@Override
protected void initLocalObjectsImpl() {
HiveConf hiveConf = null;
@ -195,13 +176,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
hmsExternalDatabase.getTables().forEach(table -> names.add(table.getName()));
return names;
} else {
return client.getAllTables(getRealTableName(dbName));
return client.getAllTables(ClusterNamespace.getNameFromFullName(dbName));
}
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
return client.tableExists(getRealTableName(dbName), tblName);
return client.tableExists(ClusterNamespace.getNameFromFullName(dbName), tblName);
}
@Override
@ -211,7 +192,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
if (hmsExternalDatabase == null) {
return false;
}
return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
return hmsExternalDatabase.getTable(ClusterNamespace.getNameFromFullName(tblName)).isPresent();
}
public HMSCachedClient getClient() {
@ -219,69 +200,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
return client;
}
public void setLastSyncedEventId(long lastSyncedEventId) {
this.lastSyncedEventId = lastSyncedEventId;
}
public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
throws MetastoreNotificationFetchException {
makeSureInitialized();
long currentEventId = getCurrentEventId();
if (lastSyncedEventId < 0) {
refreshCatalog(hmsExternalCatalog);
// invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events
// but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog
lastSyncedEventId = currentEventId;
LOG.info(
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+ "lastSyncedEventId is [{}]",
hmsExternalCatalog.getName(), lastSyncedEventId);
return null;
}
LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
if (currentEventId == lastSyncedEventId) {
LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
return null;
}
try {
return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
} catch (MetastoreNotificationFetchException e) {
// Need a fallback to handle this because this error state can not be recovered until restarting FE
if (StringUtils.isNotEmpty(e.getMessage())
&& e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
refreshCatalog(hmsExternalCatalog);
// set lastSyncedEventId to currentEventId after refresh catalog successfully
lastSyncedEventId = currentEventId;
LOG.warn("Notification events are missing, maybe an event can not be handled "
+ "or processing rate is too low, fallback to refresh the catalog");
return null;
}
throw e;
}
}
private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
CatalogLog log = new CatalogLog();
log.setCatalogId(hmsExternalCatalog.getId());
log.setInvalidCache(true);
Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
}
private long getCurrentEventId() {
makeSureInitialized();
CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId();
if (currentNotificationEventId == null) {
LOG.warn("Get currentNotificationEventId is null");
return -1;
}
return currentNotificationEventId.getEventId();
}
@Override
public void dropDatabaseForReplay(String dbName) {
public void dropDatabase(String dbName) {
LOG.debug("drop database [{}]", dbName);
Long dbId = dbNameToId.remove(dbName);
if (dbId == null) {
@ -291,7 +211,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
@Override
public void createDatabaseForReplay(long dbId, String dbName) {
public void createDatabase(long dbId, String dbName) {
LOG.debug("create database [{}]", dbName);
dbNameToId.put(dbName, dbId);
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
@ -317,4 +237,17 @@ public class HMSExternalCatalog extends ExternalCatalog {
catalogProperty.addProperty(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true");
}
}
public String getHiveMetastoreUris() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
}
public String getHiveVersion() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
}
protected List<String> listDatabaseNames() {
return client.getAllDatabases();
}
}

View File

@ -1,67 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@Data
public class InitTableLog implements Writable {
enum Type {
HMS,
ES,
UNKNOWN;
}
@SerializedName(value = "catalogId")
private long catalogId;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "type")
private Type type;
@SerializedName(value = "schema")
protected volatile List<Column> schema;
public InitTableLog() {}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static InitTableLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, InitTableLog.class);
}
}

View File

@ -0,0 +1,274 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.Getter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@Data
public class MetaIdMappingsLog implements Writable {
public static final short OPERATION_TYPE_IGNORE = 0;
public static final short OPERATION_TYPE_ADD = 1;
public static final short OPERATION_TYPE_DELETE = 2;
public static final short META_OBJECT_TYPE_IGNORE = 0;
public static final short META_OBJECT_TYPE_DATABASE = 1;
public static final short META_OBJECT_TYPE_TABLE = 2;
public static final short META_OBJECT_TYPE_PARTITION = 3;
@SerializedName(value = "ctlId")
private long catalogId = -1L;
@SerializedName(value = "fromEvent")
private boolean fromHmsEvent = false;
// The synced event id of master
@SerializedName(value = "lastEventId")
private long lastSyncedEventId = -1L;
@SerializedName(value = "metaIdMappings")
private List<MetaIdMapping> metaIdMappings = Lists.newLinkedList();
public MetaIdMappingsLog() {
}
@Override
public int hashCode() {
return Objects.hash(catalogId, lastSyncedEventId,
metaIdMappings == null ? 0 : Arrays.hashCode(metaIdMappings.toArray()));
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MetaIdMappingsLog)) {
return false;
}
return Objects.equals(this.catalogId, ((MetaIdMappingsLog) obj).catalogId)
&& Objects.equals(this.fromHmsEvent, ((MetaIdMappingsLog) obj).fromHmsEvent)
&& Objects.equals(this.lastSyncedEventId, ((MetaIdMappingsLog) obj).lastSyncedEventId)
&& Objects.equals(this.metaIdMappings, ((MetaIdMappingsLog) obj).metaIdMappings);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static MetaIdMappingsLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, MetaIdMappingsLog.class);
}
public void addMetaIdMapping(MetaIdMapping metaIdMapping) {
this.metaIdMappings.add(metaIdMapping);
}
public void addMetaIdMappings(List<MetaIdMapping> metaIdMappings) {
this.metaIdMappings.addAll(metaIdMappings);
}
public static OperationType getOperationType(short opType) {
switch (opType) {
case OPERATION_TYPE_ADD:
return OperationType.ADD;
case OPERATION_TYPE_DELETE:
return OperationType.DELETE;
default:
return OperationType.IGNORE;
}
}
public static MetaObjectType getMetaObjectType(short metaObjType) {
switch (metaObjType) {
case META_OBJECT_TYPE_DATABASE:
return MetaObjectType.DATABASE;
case META_OBJECT_TYPE_TABLE:
return MetaObjectType.TABLE;
case META_OBJECT_TYPE_PARTITION:
return MetaObjectType.PARTITION;
default:
return MetaObjectType.IGNORE;
}
}
@Getter
public static class MetaIdMapping {
@SerializedName(value = "opType")
private short opType;
@SerializedName(value = "metaObjType")
private short metaObjType;
// name of Database
@SerializedName(value = "dbName")
private String dbName;
// name of Table
@SerializedName(value = "tblName")
private String tblName;
// name of Partition
@SerializedName(value = "pName")
private String partitionName;
// id of Database/Table/Partition
@SerializedName(value = "id")
private long id;
public MetaIdMapping() {}
public MetaIdMapping(short opType,
short metaObjType,
String dbName,
String tblName,
String partitionName,
long id) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = tblName;
this.partitionName = partitionName;
this.id = id;
}
public MetaIdMapping(short opType,
short metaObjType,
String dbName,
String tblName,
String partitionName) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = tblName;
this.partitionName = partitionName;
this.id = -1L;
}
public MetaIdMapping(short opType,
short metaObjType,
String dbName,
String tblName,
long id) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = tblName;
this.partitionName = null;
this.id = id;
}
public MetaIdMapping(short opType,
short metaObjType,
String dbName,
String tblName) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = tblName;
this.partitionName = null;
this.id = -1L;
}
public MetaIdMapping(short opType,
short metaObjType,
String dbName,
long id) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = null;
this.partitionName = null;
this.id = id;
}
public MetaIdMapping(short opType,
short metaObjType,
String dbName) {
this.opType = opType;
this.metaObjType = metaObjType;
this.dbName = dbName;
this.tblName = null;
this.partitionName = null;
this.id = -1L;
}
@Override
public int hashCode() {
return Objects.hash(opType, metaObjType, dbName, tblName, partitionName, id);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MetaIdMapping)) {
return false;
}
return Objects.equals(this.opType, ((MetaIdMapping) obj).opType)
&& Objects.equals(this.metaObjType, ((MetaIdMapping) obj).metaObjType)
&& Objects.equals(this.dbName, ((MetaIdMapping) obj).dbName)
&& Objects.equals(this.tblName, ((MetaIdMapping) obj).tblName)
&& Objects.equals(this.partitionName, ((MetaIdMapping) obj).partitionName)
&& Objects.equals(this.id, ((MetaIdMapping) obj).id);
}
}
public enum OperationType {
IGNORE(OPERATION_TYPE_IGNORE),
// Add a Database/Table/Partition
ADD(OPERATION_TYPE_ADD),
// Delete Database/Table/Partition
DELETE(OPERATION_TYPE_DELETE);
private final short opType;
OperationType(short opType) {
this.opType = opType;
}
public short getOperationType() {
return opType;
}
}
public enum MetaObjectType {
IGNORE(META_OBJECT_TYPE_IGNORE),
DATABASE(META_OBJECT_TYPE_DATABASE),
TABLE(META_OBJECT_TYPE_TABLE),
PARTITION(META_OBJECT_TYPE_PARTITION);
private final short metaObjType;
MetaObjectType(short metaObjType) {
this.metaObjType = metaObjType;
}
public short getMetaObjectType() {
return metaObjType;
}
}
}

View File

@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
@ -109,4 +112,17 @@ public class AddPartitionEvent extends MetastorePartitionEvent {
debugString("Failed to process event"), e);
}
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = Lists.newArrayList();
for (String partitionName : this.getAllPartitionNames()) {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
dbName, tblName, partitionName, ExternalMetaIdMgr.nextMetaId());
metaIdMappings.add(metaIdMapping);
}
return ImmutableList.copyOf(metaIdMappings);
}
}

View File

@ -100,7 +100,8 @@ public class AlterTableEvent extends MetastoreTableEvent {
Env.getCurrentEnv().getCatalogMgr()
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
Env.getCurrentEnv().getCatalogMgr()
.createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
.createExternalTableFromEvent(
tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true);
}
private void processRename() throws DdlException {
@ -118,7 +119,8 @@ public class AlterTableEvent extends MetastoreTableEvent {
Env.getCurrentEnv().getCatalogMgr()
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
Env.getCurrentEnv().getCatalogMgr()
.createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
.createExternalTableFromEvent(
tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true);
}

View File

@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@ -59,4 +62,13 @@ public class CreateDatabaseEvent extends MetastoreEvent {
debugString("Failed to process event"), e);
}
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
dbName, ExternalMetaIdMgr.nextMetaId());
return ImmutableList.of(metaIdMapping);
}
}

View File

@ -19,8 +19,11 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
@ -77,10 +80,19 @@ public class CreateTableEvent extends MetastoreTableEvent {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName);
Env.getCurrentEnv().getCatalogMgr()
.createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, true);
.createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, eventTime, true);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
}
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
dbName, tblName, ExternalMetaIdMgr.nextMetaId());
return ImmutableList.of(metaIdMapping);
}
}

View File

@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@ -59,4 +61,13 @@ public class DropDatabaseEvent extends MetastoreEvent {
debugString("Failed to process event"), e);
}
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
dbName);
return ImmutableList.of(metaIdMapping);
}
}

View File

@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@ -124,7 +126,7 @@ public class DropPartitionEvent extends MetastorePartitionEvent {
return false;
}
// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
// `that` event can be batched if this event's partitions contains all the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
@ -136,4 +138,17 @@ public class DropPartitionEvent extends MetastorePartitionEvent {
return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = Lists.newArrayList();
for (String partitionName : this.getAllPartitionNames()) {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
dbName, tblName, partitionName);
metaIdMappings.add(metaIdMapping);
}
return ImmutableList.copyOf(metaIdMappings);
}
}

View File

@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
@ -89,14 +91,23 @@ public class DropTableEvent extends MetastoreTableEvent {
return false;
}
/**
/*
* Check if `that` event is a rename event, a rename event can not be batched
* because the process of `that` event will change the reference relation of this table,
* otherwise it can be batched because this event is a drop-table event
* and the process of this event will drop the whole table,
* and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false
* */
*/
MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
return !thatTblEvent.willChangeTableName();
}
@Override
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
dbName, tblName);
return ImmutableList.of(metaIdMapping);
}
}

View File

@ -17,8 +17,10 @@
package org.apache.doris.datasource.hive.event;
import org.apache.doris.datasource.MetaIdMappingsLog;
import org.apache.doris.datasource.hive.HMSCachedClient;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -227,6 +229,13 @@ public abstract class MetastoreEvent {
return name.toString();
}
/**
* Create a MetaIdMapping list from the event if the event is a create/add/drop event
*/
protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() {
return ImmutableList.of();
}
@Override
public String toString() {
return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);

View File

@ -18,7 +18,9 @@
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MetaIdMappingsLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -77,23 +79,39 @@ public class MetastoreEventFactory implements EventFactory {
for (NotificationEvent event : events) {
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName));
}
return createBatchEvents(catalogName, metastoreEvents);
List<MetastoreEvent> mergedEvents = mergeEvents(catalogName, metastoreEvents);
if (Env.getCurrentEnv().isMaster()) {
logMetaIdMappings(hmsExternalCatalog.getId(), events.get(events.size() - 1).getEventId(), mergedEvents);
}
return mergedEvents;
}
private void logMetaIdMappings(long catalogId, long lastSyncedEventId, List<MetastoreEvent> mergedEvents) {
MetaIdMappingsLog log = new MetaIdMappingsLog();
log.setCatalogId(catalogId);
log.setFromHmsEvent(true);
log.setLastSyncedEventId(lastSyncedEventId);
for (MetastoreEvent event : mergedEvents) {
log.addMetaIdMappings(event.transferToMetaIdMappings());
}
Env.getCurrentEnv().getExternalMetaIdMgr().replayMetaIdMappingsLog(log);
Env.getCurrentEnv().getEditLog().logMetaIdMappingsLog(log);
}
/**
* Merge events to reduce the cost time on event processing, currently mainly handles MetastoreTableEvent
* because merge MetastoreTableEvent is simple and cost-effective.
* For example, consider there are some events as following:
*
* <pre>
* event1: alter table db1.t1 add partition p1;
* event2: alter table db1.t1 drop partition p2;
* event3: alter table db1.t2 add partition p3;
* event4: alter table db2.t3 rename to t4;
* event5: drop table db1.t1;
*
* </pre>
* Only `event3 event4 event5` will be reserved and other events will be skipped.
* */
public List<MetastoreEvent> createBatchEvents(String catalogName, List<MetastoreEvent> events) {
public List<MetastoreEvent> mergeEvents(String catalogName, List<MetastoreEvent> events) {
List<MetastoreEvent> eventsCopy = Lists.newArrayList(events);
Map<MetastoreTableEvent.TableKey, List<Integer>> indexMap = Maps.newLinkedHashMap();
for (int i = 0; i < events.size(); i++) {

View File

@ -18,13 +18,22 @@
package org.apache.doris.datasource.hive.event;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.HMSClientException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.OriginStatement;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
@ -35,6 +44,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A metastore event is a instance of the class
@ -68,6 +78,13 @@ public class MetastoreEventsProcessor extends MasterDaemon {
// event factory which is used to get or create MetastoreEvents
private final MetastoreEventFactory metastoreEventFactory;
// manager the lastSyncedEventId of hms catalogs
// use HashMap is fine because all operations are in one thread
private final Map<Long, Long> lastSyncedEventIdMap = Maps.newHashMap();
// manager the masterLastSyncedEventId of hms catalogs
private final Map<Long, Long> masterLastSyncedEventIdMap = Maps.newHashMap();
private boolean isRunning;
public MetastoreEventsProcessor() {
@ -76,49 +93,6 @@ public class MetastoreEventsProcessor extends MasterDaemon {
this.isRunning = false;
}
/**
* Fetch the next batch of NotificationEvents from metastore. The default batch size is
* <code>{@link Config#hms_events_batch_size_per_rpc}</code>
*/
private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) {
LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName());
NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
if (response == null) {
return Collections.emptyList();
}
return response.getEvents();
}
private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) {
for (MetastoreEvent event : events) {
try {
event.process();
} catch (HMSClientException hmsClientException) {
if (hmsClientException.getCause() != null
&& hmsClientException.getCause() instanceof NoSuchObjectException) {
LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException);
} else {
hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
throw hmsClientException;
}
} catch (Exception e) {
hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
throw e;
}
}
}
/**
* Process the given list of notification events. Useful for tests which provide a list of events
*/
private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
//transfer
List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
doExecute(metastoreEvents, hmsExternalCatalog);
hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId());
}
@Override
protected void runAfterCatalogReady() {
if (isRunning) {
@ -157,6 +131,189 @@ public class MetastoreEventsProcessor extends MasterDaemon {
}
}
/**
* Fetch the next batch of NotificationEvents from metastore. The default batch size is
* <code>{@link Config#hms_events_batch_size_per_rpc}</code>
*/
private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) throws Exception {
LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName());
NotificationEventResponse response;
if (Env.getCurrentEnv().isMaster()) {
response = getNextEventResponseForMaster(hmsExternalCatalog);
} else {
response = getNextEventResponseForSlave(hmsExternalCatalog);
}
if (response == null) {
return Collections.emptyList();
}
return response.getEvents();
}
private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) {
for (MetastoreEvent event : events) {
try {
event.process();
} catch (HMSClientException hmsClientException) {
if (hmsClientException.getCause() != null
&& hmsClientException.getCause() instanceof NoSuchObjectException) {
LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException);
} else {
updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1);
throw hmsClientException;
}
} catch (Exception e) {
updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1);
throw e;
}
}
}
/**
* Process the given list of notification events. Useful for tests which provide a list of events
*/
private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
//transfer
List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
doExecute(metastoreEvents, hmsExternalCatalog);
updateLastSyncedEventId(hmsExternalCatalog, events.get(events.size() - 1).getEventId());
}
private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog)
throws MetastoreNotificationFetchException {
long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog);
long currentEventId = getCurrentHmsEventId(hmsExternalCatalog);
if (lastSyncedEventId < 0) {
refreshCatalogForMaster(hmsExternalCatalog);
// invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events
// but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog
updateLastSyncedEventId(hmsExternalCatalog, currentEventId);
LOG.info(
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+ "lastSyncedEventId is [{}]",
hmsExternalCatalog.getName(), lastSyncedEventId);
return null;
}
LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {}, lastSyncedEventId is {}",
hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
if (currentEventId == lastSyncedEventId) {
LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
return null;
}
try {
return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
Config.hms_events_batch_size_per_rpc, null);
} catch (MetastoreNotificationFetchException e) {
// Need a fallback to handle this because this error state can not be recovered until restarting FE
if (StringUtils.isNotEmpty(e.getMessage())
&& e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
refreshCatalogForMaster(hmsExternalCatalog);
// set lastSyncedEventId to currentEventId after refresh catalog successfully
updateLastSyncedEventId(hmsExternalCatalog, currentEventId);
LOG.warn("Notification events are missing, maybe an event can not be handled "
+ "or processing rate is too low, fallback to refresh the catalog");
return null;
}
throw e;
}
}
private NotificationEventResponse getNextEventResponseForSlave(HMSExternalCatalog hmsExternalCatalog)
throws Exception {
long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog);
long masterLastSyncedEventId = getMasterLastSyncedEventId(hmsExternalCatalog);
// do nothing if masterLastSyncedEventId has not been synced
if (masterLastSyncedEventId == -1L) {
LOG.info("LastSyncedEventId of master has not been synced on catalog [{}]", hmsExternalCatalog.getName());
return null;
}
// do nothing if lastSyncedEventId is equals to masterLastSyncedEventId
if (lastSyncedEventId == masterLastSyncedEventId) {
LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
return null;
}
if (lastSyncedEventId < 0) {
refreshCatalogForSlave(hmsExternalCatalog);
// Use masterLastSyncedEventId to avoid missing events
updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId);
LOG.info(
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+ "lastSyncedEventId is [{}]",
hmsExternalCatalog.getName(), lastSyncedEventId);
return null;
}
LOG.debug("Catalog [{}] getNextEventResponse, masterLastSyncedEventId is {}, lastSyncedEventId is {}",
hmsExternalCatalog.getName(), masterLastSyncedEventId, lastSyncedEventId);
// For slave FE nodes, only fetch events which id is lower than masterLastSyncedEventId
int maxEventSize = Math.min((int) (masterLastSyncedEventId - lastSyncedEventId),
Config.hms_events_batch_size_per_rpc);
try {
return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, maxEventSize, null);
} catch (MetastoreNotificationFetchException e) {
// Need a fallback to handle this because this error state can not be recovered until restarting FE
if (StringUtils.isNotEmpty(e.getMessage())
&& e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
refreshCatalogForMaster(hmsExternalCatalog);
// set masterLastSyncedEventId to lastSyncedEventId after refresh catalog successfully
updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId);
LOG.warn("Notification events are missing, maybe an event can not be handled "
+ "or processing rate is too low, fallback to refresh the catalog");
return null;
}
throw e;
}
}
private long getCurrentHmsEventId(HMSExternalCatalog hmsExternalCatalog) {
CurrentNotificationEventId currentNotificationEventId = hmsExternalCatalog.getClient()
.getCurrentNotificationEventId();
if (currentNotificationEventId == null) {
LOG.warn("Get currentNotificationEventId is null");
return -1L;
}
return currentNotificationEventId.getEventId();
}
private long getLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) {
// Returns to -1 if not exists, otherwise client.getNextNotification will throw exception
// Reference to https://github.com/apDdlache/doris/issues/18251
return lastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L);
}
private void updateLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) {
lastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId);
}
private long getMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) {
return masterLastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L);
}
public void updateMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) {
masterLastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId);
}
private void refreshCatalogForMaster(HMSExternalCatalog hmsExternalCatalog) {
CatalogLog log = new CatalogLog();
log.setCatalogId(hmsExternalCatalog.getId());
log.setInvalidCache(true);
Env.getCurrentEnv().getCatalogMgr().replayRefreshCatalog(log);
}
private void refreshCatalogForSlave(HMSExternalCatalog hmsExternalCatalog) throws Exception {
// Transfer to master to refresh catalog
String sql = "REFRESH CATALOG " + hmsExternalCatalog.getName();
OriginStatement originStmt = new OriginStatement(sql, 0);
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, new ConnectContext(),
RedirectStatus.FORWARD_WITH_SYNC, false);
LOG.debug("Transfer to master to refresh catalog, stmt: {}", sql);
masterOpExecutor.execute();
}
public static MessageDeserializer getMessageDeserializer(String messageFormat) {
if (messageFormat != null && messageFormat.startsWith(GZIP_JSON_FORMAT_PREFIX)) {
return GZIP_JSON_MESSAGE_DESERIALIZER;

View File

@ -42,7 +42,7 @@ import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.datasource.MetaIdMappingsLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteLog;
import org.apache.doris.job.base.AbstractJob;
@ -746,19 +746,18 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_INIT_EXTERNAL_TABLE: {
data = InitTableLog.read(in);
isRead = true;
break;
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
case OperationType.OP_INIT_EXTERNAL_TABLE:
case OperationType.OP_DROP_EXTERNAL_TABLE:
case OperationType.OP_CREATE_EXTERNAL_TABLE:
case OperationType.OP_DROP_EXTERNAL_DB:
case OperationType.OP_CREATE_EXTERNAL_DB:
case OperationType.OP_ADD_EXTERNAL_PARTITIONS:
case OperationType.OP_DROP_EXTERNAL_PARTITIONS:
case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS:
case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
isRead = true;
break;
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
data = ExternalObjectLog.read(in);
isRead = true;
@ -906,6 +905,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ADD_META_ID_MAPPINGS: {
data = MetaIdMappingsLog.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -51,6 +51,7 @@ import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.MetaIdMappingsLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteLog;
import org.apache.doris.job.base.AbstractJob;
@ -1011,38 +1012,24 @@ public class EditLog {
break;
}
case OperationType.OP_DROP_EXTERNAL_TABLE: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayDropExternalTable(log);
break;
}
case OperationType.OP_CREATE_EXTERNAL_TABLE: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayCreateExternalTableFromEvent(log);
break;
}
case OperationType.OP_DROP_EXTERNAL_DB: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayDropExternalDatabase(log);
break;
}
case OperationType.OP_CREATE_EXTERNAL_DB: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayCreateExternalDatabase(log);
break;
}
case OperationType.OP_ADD_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayAddExternalPartitions(log);
break;
}
case OperationType.OP_DROP_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayDropExternalPartitions(log);
break;
}
case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayRefreshExternalPartitions(log);
break;
}
case OperationType.OP_CREATE_WORKLOAD_GROUP: {
@ -1165,6 +1152,10 @@ public class EditLog {
env.getBackupHandler().getRepoMgr().alterRepo(repository, true);
break;
}
case OperationType.OP_ADD_META_ID_MAPPINGS: {
env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) journal.getData());
break;
}
case OperationType.OP_LOG_UPDATE_ROWS:
case OperationType.OP_LOG_NEW_PARTITION_LOADED:
case OperationType.OP_LOG_ALTER_COLUMN_STATS: {
@ -1886,26 +1877,32 @@ public class EditLog {
logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
}
@Deprecated
public void logDropExternalTable(ExternalObjectLog log) {
logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
}
@Deprecated
public void logCreateExternalTable(ExternalObjectLog log) {
logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log);
}
@Deprecated
public void logDropExternalDatabase(ExternalObjectLog log) {
logEdit(OperationType.OP_DROP_EXTERNAL_DB, log);
}
@Deprecated
public void logCreateExternalDatabase(ExternalObjectLog log) {
logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log);
}
@Deprecated
public void logAddExternalPartitions(ExternalObjectLog log) {
logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log);
}
@Deprecated
public void logDropExternalPartitions(ExternalObjectLog log) {
logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log);
}
@ -2005,6 +2002,10 @@ public class EditLog {
logEdit(OperationType.OP_INSERT_OVERWRITE, log);
}
public void logMetaIdMappingsLog(MetaIdMappingsLog log) {
logEdit(OperationType.OP_ADD_META_ID_MAPPINGS, log);
}
public String getNotReadyReason() {
if (journal == null) {
return "journal is null";

View File

@ -288,12 +288,19 @@ public class OperationType {
public static final short OP_ADD_CONSTRAINT = 346;
public static final short OP_DROP_CONSTRAINT = 347;
@Deprecated
public static final short OP_DROP_EXTERNAL_TABLE = 350;
@Deprecated
public static final short OP_DROP_EXTERNAL_DB = 351;
@Deprecated
public static final short OP_CREATE_EXTERNAL_TABLE = 352;
@Deprecated
public static final short OP_CREATE_EXTERNAL_DB = 353;
@Deprecated
public static final short OP_ADD_EXTERNAL_PARTITIONS = 354;
@Deprecated
public static final short OP_DROP_EXTERNAL_PARTITIONS = 355;
@Deprecated
public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
public static final short OP_ALTER_USER = 400;
@ -364,6 +371,8 @@ public class OperationType {
public static final short OP_LOG_ALTER_COLUMN_STATS = 464;
public static final short OP_ADD_META_ID_MAPPINGS = 470;
/**
* Get opcode name by op code.
**/

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ExternalMetaIdMgrTest {
@Test
public void testReplayMetaIdMappingsLog() {
ExternalMetaIdMgr mgr = new ExternalMetaIdMgr();
MetaIdMappingsLog log1 = new MetaIdMappingsLog();
log1.setCatalogId(1L);
log1.setFromHmsEvent(false);
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
"db1", ExternalMetaIdMgr.nextMetaId()));
mgr.replayMetaIdMappingsLog(log1);
Assertions.assertNotEquals(-1L, mgr.getDbId(1L, "db1"));
MetaIdMappingsLog log2 = new MetaIdMappingsLog();
log2.setCatalogId(1L);
log2.setFromHmsEvent(false);
log2.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
"db1"));
mgr.replayMetaIdMappingsLog(log2);
Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
MetaIdMappingsLog log3 = new MetaIdMappingsLog();
log3.setCatalogId(1L);
log3.setFromHmsEvent(false);
log3.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
"db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
mgr.replayMetaIdMappingsLog(log3);
Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
Assertions.assertNotEquals(-1L, mgr.getTblId(1L, "db1", "tbl1"));
MetaIdMappingsLog log4 = new MetaIdMappingsLog();
log4.setCatalogId(1L);
log4.setFromHmsEvent(false);
log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
"db1", "tbl1"));
log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
"db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId()));
mgr.replayMetaIdMappingsLog(log4);
Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
Assertions.assertEquals(-1L, mgr.getTblId(1L, "db1", "tbl1"));
Assertions.assertNotEquals(-1L, mgr.getPartitionId(1L, "db1", "tbl1", "p1"));
}
}

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class MetaIdMappingsLogTest {
@Test
public void testSerialization() throws Exception {
// 1. Write objects to file
MetaIdMappingsLog log1 = new MetaIdMappingsLog();
Path path = Files.createFile(Paths.get("./metaIdMappingsLogTest.txt"));
try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path))) {
log1.setFromHmsEvent(true);
log1.setLastSyncedEventId(-1L);
log1.setCatalogId(1L);
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
"db1", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
"db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
"db1", "tbl2", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
"db2"));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
"db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_ADD,
MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
"db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
"db2"));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
"db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
MetaIdMappingsLog.OPERATION_TYPE_DELETE,
MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
"db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId()));
log1.write(dos);
dos.flush();
} catch (Throwable throwable) {
throwable.printStackTrace();
Files.deleteIfExists(path);
Assertions.fail();
}
// 2. Read objects from file
MetaIdMappingsLog log2;
try (DataInputStream dis = new DataInputStream(Files.newInputStream(path))) {
log2 = MetaIdMappingsLog.read(dis);
Assertions.assertEquals(log1, log2);
} catch (Throwable throwable) {
throwable.printStackTrace();
Assertions.fail();
} finally {
Files.deleteIfExists(path);
}
}
}

View File

@ -454,7 +454,7 @@ public class MetastoreEventFactoryTest {
for (int j = 0; j < 1000; j++) {
events.add(producer.produceOneEvent(j));
}
List<MetastoreEvent> mergedEvents = factory.createBatchEvents(testCtl, events);
List<MetastoreEvent> mergedEvents = factory.mergeEvents(testCtl, events);
for (MetastoreEvent event : events) {
processEvent(validateCatalog, event);