[Fix](multi-catalog) Fix sync hms event failed when start FE soon. (#19344)
* [Fix](multi-catalog) Fix sync hms event failed when start FE soon after. * [Fix](multi-catalog) Fix sync hms event failed when start FE soon after. --------- Co-authored-by: wangxiangyu@360shuke.com <wangxiangyu@360shuke.com>
This commit is contained in:
@ -69,7 +69,7 @@ public class RefreshManager {
|
||||
refreshInternalCtlIcebergTable(stmt, env);
|
||||
} else {
|
||||
// Process external catalog table refresh
|
||||
env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName);
|
||||
env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false);
|
||||
}
|
||||
LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName);
|
||||
}
|
||||
|
||||
@ -581,7 +581,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
|
||||
public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -591,12 +592,18 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (table instanceof ExternalTable) {
|
||||
((ExternalTable) table).unsetObjectCreated();
|
||||
@ -630,7 +637,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
}
|
||||
|
||||
public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
|
||||
public void dropExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -640,12 +648,18 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
@ -695,7 +709,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName);
|
||||
}
|
||||
|
||||
public void createExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
|
||||
public void createExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -705,12 +720,18 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table != null) {
|
||||
throw new DdlException("Table " + tableName + " has exist in db " + dbName);
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Table " + tableName + " has exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
@ -742,7 +763,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void dropExternalDatabase(String dbName, String catalogName) throws DdlException {
|
||||
public void dropExternalDatabase(String dbName, String catalogName, boolean ignoreIfNotExists) throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -752,7 +773,10 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
@ -785,7 +809,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void createExternalDatabase(String dbName, String catalogName) throws DdlException {
|
||||
public void createExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -795,7 +819,10 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db != null) {
|
||||
throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
@ -822,7 +849,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames)
|
||||
public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
|
||||
boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
@ -833,12 +861,18 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
@ -872,7 +906,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
.addPartitionsCache(catalog.getId(), table, log.getPartitionNames());
|
||||
}
|
||||
|
||||
public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames)
|
||||
public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
|
||||
boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
@ -883,12 +918,18 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
|
||||
@ -79,7 +79,7 @@ public class AddPartitionEvent extends MetastoreTableEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames);
|
||||
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -79,9 +79,11 @@ public class AlterPartitionEvent extends MetastoreTableEvent {
|
||||
catalogName, dbName, tblName, partitionNameBefore, partitionNameAfter);
|
||||
if (isRename) {
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameBefore));
|
||||
.dropExternalPartitions(catalogName, dbName, tblName,
|
||||
Lists.newArrayList(partitionNameBefore), true);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.addExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameAfter));
|
||||
.addExternalPartitions(catalogName, dbName, tblName,
|
||||
Lists.newArrayList(partitionNameAfter), true);
|
||||
} else {
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
|
||||
|
||||
@ -73,9 +73,9 @@ public class AlterTableEvent extends MetastoreTableEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
|
||||
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName);
|
||||
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
|
||||
}
|
||||
|
||||
private void processRename() throws DdlException {
|
||||
@ -91,9 +91,9 @@ public class AlterTableEvent extends MetastoreTableEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
|
||||
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName);
|
||||
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
|
||||
|
||||
}
|
||||
|
||||
@ -118,7 +118,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
|
||||
}
|
||||
//The scope of refresh can be narrowed in the future
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
|
||||
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
|
||||
} catch (Exception e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -48,7 +48,7 @@ public class CreateDatabaseEvent extends MetastoreEvent {
|
||||
try {
|
||||
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.createExternalDatabase(dbName, catalogName);
|
||||
.createExternalDatabase(dbName, catalogName, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -59,16 +59,7 @@ public class CreateTableEvent extends MetastoreTableEvent {
|
||||
protected void process() throws MetastoreNotificationException {
|
||||
try {
|
||||
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName);
|
||||
boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
|
||||
.externalTableExistInLocal(dbName, hmsTbl.getTableName(), catalogName);
|
||||
if (hasExist) {
|
||||
infoLog(
|
||||
"CreateExternalTable canceled,because table has exist,"
|
||||
+ "catalogName:[{}],dbName:[{}],tableName:[{}]",
|
||||
catalogName, dbName, tblName);
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, hmsTbl.getTableName(), catalogName);
|
||||
Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, hmsTbl.getTableName(), catalogName, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -48,7 +48,7 @@ public class DropDatabaseEvent extends MetastoreEvent {
|
||||
try {
|
||||
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalDatabase(dbName, catalogName);
|
||||
.dropExternalDatabase(dbName, catalogName, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -79,7 +79,7 @@ public class DropPartitionEvent extends MetastoreTableEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames);
|
||||
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -59,7 +59,7 @@ public class DropTableEvent extends MetastoreTableEvent {
|
||||
protected void process() throws MetastoreNotificationException {
|
||||
try {
|
||||
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tableName);
|
||||
Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName);
|
||||
Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
@ -66,7 +66,7 @@ public class InsertEvent extends MetastoreTableEvent {
|
||||
* the file cache of this table,
|
||||
* but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it.
|
||||
*/
|
||||
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(dbName, tblName, catalogName);
|
||||
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(dbName, tblName, catalogName, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"));
|
||||
|
||||
Reference in New Issue
Block a user