[Fix](multi-catalog) Fix sync hms event failed. (#19555)
A similar situation with #19344 , because sometimes hms meta info is newer than hms events, if we try to invoke org.apache.doris.datasource.hive.PooledHiveMetaStoreClient#getTable and this table is not exists, some error will throws and this event can not be handled.
This commit is contained in:
@ -892,12 +892,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
return;
|
||||
}
|
||||
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(),
|
||||
(ExternalTable) table, partitionNames);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
replayAddExternalPartitions(log);
|
||||
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
|
||||
}
|
||||
|
||||
@ -919,8 +920,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
try {
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.addPartitionsCache(catalog.getId(), table, log.getPartitionNames());
|
||||
} catch (HMSClientException e) {
|
||||
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
|
||||
db.getFullName(), table.getName());
|
||||
}
|
||||
}
|
||||
|
||||
public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
|
||||
@ -981,23 +988,32 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
|
||||
public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
List<String> partitionNames)
|
||||
List<String> partitionNames, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!(catalog instanceof ExternalCatalog)) {
|
||||
throw new DdlException("Only support ExternalCatalog");
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
|
||||
@ -146,8 +146,7 @@ public class ExternalMetaCacheMgr {
|
||||
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames,
|
||||
((HMSExternalTable) table).getPartitionColumnTypes(), true);
|
||||
metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames, true);
|
||||
}
|
||||
LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
|
||||
}
|
||||
|
||||
@ -571,8 +571,8 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
|
||||
public void dropPartitionsCache(String dbName, String tblName, List<String> partitionNames,
|
||||
List<Type> partitionColumnTypes, boolean invalidPartitionCache) {
|
||||
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes);
|
||||
boolean invalidPartitionCache) {
|
||||
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
|
||||
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
|
||||
if (partitionValues == null) {
|
||||
return;
|
||||
@ -595,17 +595,22 @@ public class HiveMetaStoreCache {
|
||||
idToPartitionItemBefore.remove(partitionId);
|
||||
partitionValuesMap.remove(partitionId);
|
||||
List<UniqueId> uniqueIds = idToUniqueIdsMapBefore.remove(partitionId);
|
||||
if (key.types.size() > 1) {
|
||||
for (UniqueId uniqueId : uniqueIds) {
|
||||
for (UniqueId uniqueId : uniqueIds) {
|
||||
if (uidToPartitionRangeBefore != null) {
|
||||
Range<PartitionKey> range = uidToPartitionRangeBefore.remove(uniqueId);
|
||||
rangeToIdBefore.remove(range);
|
||||
if (range != null) {
|
||||
rangeToIdBefore.remove(range);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (UniqueId uniqueId : uniqueIds) {
|
||||
|
||||
if (singleUidToColumnRangeMapBefore != null) {
|
||||
Range<ColumnBound> range = singleUidToColumnRangeMapBefore.remove(uniqueId);
|
||||
singleColumnRangeMapBefore.remove(range);
|
||||
if (range != null) {
|
||||
singleColumnRangeMapBefore.remove(range);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (invalidPartitionCache) {
|
||||
invalidatePartitionCache(dbName, tblName, partitionName);
|
||||
}
|
||||
|
||||
@ -87,7 +87,7 @@ public class AlterPartitionEvent extends MetastoreTableEvent {
|
||||
} else {
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
|
||||
Lists.newArrayList(partitionNameAfter));
|
||||
Lists.newArrayList(partitionNameAfter), true);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
|
||||
@ -128,6 +128,9 @@ public abstract class MetastoreEvent {
|
||||
|
||||
/**
|
||||
* Process the information available in the NotificationEvent.
|
||||
* Better not to call (direct/indirect) apis of {@link org.apache.doris.datasource.hive.PooledHiveMetaStoreClient}
|
||||
* during handling hms events (Reference to https://github.com/apache/doris/pull/19120).
|
||||
* Try to add some fallback strategies if it is highly necessary.
|
||||
*/
|
||||
protected abstract void process() throws MetastoreNotificationException;
|
||||
|
||||
|
||||
@ -22,8 +22,10 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSClientException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
|
||||
@ -92,6 +94,14 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
for (MetastoreEvent event : events) {
|
||||
try {
|
||||
event.process();
|
||||
} catch (HMSClientException hmsClientException) {
|
||||
if (hmsClientException.getCause() != null
|
||||
&& hmsClientException.getCause() instanceof NoSuchObjectException) {
|
||||
LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException);
|
||||
} else {
|
||||
hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
|
||||
throw hmsClientException;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
|
||||
throw e;
|
||||
|
||||
@ -466,7 +466,7 @@ public class CatalogMgrTest extends TestWithFeService {
|
||||
Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache);
|
||||
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
|
||||
metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
|
||||
partitionValueCacheKey.getTypes(), false);
|
||||
false);
|
||||
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
|
||||
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0);
|
||||
}
|
||||
@ -496,7 +496,7 @@ public class CatalogMgrTest extends TestWithFeService {
|
||||
Lists.newArrayList("m=1", "m=2"), metaStoreCache);
|
||||
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
|
||||
metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=1", "m=2"),
|
||||
partitionValueCacheKey.getTypes(), false);
|
||||
false);
|
||||
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
|
||||
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user