[Fix](multi-catalog) Fallback to refresh catalog when hms events are missing (#21333)
Fix #20227, the implementation has some problems and can not catch event-missing-exception.
This commit is contained in:
@ -32,6 +32,7 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
@ -203,9 +204,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
|
||||
throws MetastoreNotificationFetchException {
|
||||
makeSureInitialized();
|
||||
long currentEventId = getCurrentEventId();
|
||||
if (lastSyncedEventId < 0) {
|
||||
lastSyncedEventId = getCurrentEventId();
|
||||
refreshCatalog(hmsExternalCatalog);
|
||||
// invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events
|
||||
// but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog
|
||||
lastSyncedEventId = currentEventId;
|
||||
LOG.info(
|
||||
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
|
||||
+ "lastSyncedEventId is [{}]",
|
||||
@ -213,7 +217,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
return null;
|
||||
}
|
||||
|
||||
long currentEventId = getCurrentEventId();
|
||||
LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
|
||||
hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
|
||||
if (currentEventId == lastSyncedEventId) {
|
||||
@ -223,11 +226,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
|
||||
try {
|
||||
return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
|
||||
} catch (IllegalStateException e) {
|
||||
} catch (MetastoreNotificationFetchException e) {
|
||||
// Need a fallback to handle this because this error state can not be recovered until restarting FE
|
||||
if (HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE.equals(e.getMessage())) {
|
||||
lastSyncedEventId = getCurrentEventId();
|
||||
if (StringUtils.isNotEmpty(e.getMessage())
|
||||
&& e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
|
||||
refreshCatalog(hmsExternalCatalog);
|
||||
// set lastSyncedEventId to currentEventId after refresh catalog successfully
|
||||
lastSyncedEventId = currentEventId;
|
||||
LOG.warn("Notification events are missing, maybe an event can not be handled "
|
||||
+ "or processing rate is too low, fallback to refresh the catalog");
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user