[fix](hive)Modify the Hive notification event processing method when using meta cache and add parameters to the Hive catalog. (#39239) (#39865)
bp #39239 Co-authored-by: daidai <2017501503@qq.com>
This commit is contained in:
@ -1755,9 +1755,8 @@ public class Env {
|
||||
domainResolver.start();
|
||||
// fe disk updater
|
||||
feDiskUpdater.start();
|
||||
if (Config.enable_hms_events_incremental_sync) {
|
||||
metastoreEventsProcessor.start();
|
||||
}
|
||||
|
||||
metastoreEventsProcessor.start();
|
||||
|
||||
dnsCache.start();
|
||||
|
||||
|
||||
@ -655,13 +655,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table != null) {
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Table " + tableName + " has exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
long tblId;
|
||||
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
|
||||
if (hmsCatalog.getUseMetaCache().get()) {
|
||||
@ -712,13 +705,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
if (!(catalog instanceof ExternalCatalog)) {
|
||||
throw new DdlException("Only support create ExternalCatalog databases");
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db != null) {
|
||||
if (!ignoreIfExists) {
|
||||
throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
|
||||
long dbId;
|
||||
|
||||
@ -289,11 +289,11 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
|
||||
LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
|
||||
getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
|
||||
useMetaCache = Optional.of(true);
|
||||
}
|
||||
// if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
|
||||
// LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
|
||||
// getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
|
||||
// useMetaCache = Optional.of(true);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -451,13 +451,14 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
|
||||
@Override
|
||||
public void unregisterTable(String tableName) {
|
||||
makeSureInitialized();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("create table [{}]", tableName);
|
||||
}
|
||||
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.invalidate(tableName);
|
||||
metaCache.invalidate(tableName, Util.genIdByName(getQualifiedName(tableName)));
|
||||
}
|
||||
} else {
|
||||
Long tableId = tableNameToId.remove(tableName);
|
||||
@ -480,6 +481,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
// Only used for sync hive metastore event
|
||||
@Override
|
||||
public boolean registerTable(TableIf tableIf) {
|
||||
makeSureInitialized();
|
||||
long tableId = tableIf.getId();
|
||||
String tableName = tableIf.getName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -487,11 +489,13 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
}
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.updateCache(tableName, (T) tableIf);
|
||||
metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName)));
|
||||
}
|
||||
} else {
|
||||
tableNameToId.put(tableName, tableId);
|
||||
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
|
||||
if (!tableNameToId.containsKey(tableName)) {
|
||||
tableNameToId.put(tableName, tableId);
|
||||
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
|
||||
}
|
||||
}
|
||||
setLastUpdateTime(System.currentTimeMillis());
|
||||
return true;
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.security.authentication.AuthenticationConfig;
|
||||
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogProperty;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalDatabase;
|
||||
@ -73,6 +74,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
@Getter
|
||||
private HadoopAuthenticator authenticator;
|
||||
|
||||
private int hmsEventsBatchSizePerRpc = -1;
|
||||
private boolean enableHmsEventsIncrementalSync = false;
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public HMSExternalCatalog() {
|
||||
catalogProperty = new CatalogProperty(null, null);
|
||||
@ -100,6 +105,19 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
throw new DdlException(
|
||||
"The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond);
|
||||
}
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
|
||||
enableHmsEventsIncrementalSync =
|
||||
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
|
||||
} else {
|
||||
enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
|
||||
}
|
||||
|
||||
if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
|
||||
hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
|
||||
} else {
|
||||
hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
|
||||
}
|
||||
|
||||
// check the dfs.ha properties
|
||||
// 'dfs.nameservices'='your-nameservice',
|
||||
@ -212,7 +230,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
if (useMetaCache.get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.invalidate(dbName);
|
||||
metaCache.invalidate(dbName, Util.genIdByName(getQualifiedName(dbName)));
|
||||
}
|
||||
} else {
|
||||
Long dbId = dbNameToId.remove(dbName);
|
||||
@ -233,7 +251,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType);
|
||||
if (useMetaCache.get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.updateCache(dbName, db);
|
||||
metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName)));
|
||||
}
|
||||
} else {
|
||||
dbNameToId.put(dbName, dbId);
|
||||
@ -266,4 +284,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
public String getHiveVersion() {
|
||||
return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
|
||||
}
|
||||
|
||||
public int getHmsEventsBatchSizePerRpc() {
|
||||
return hmsEventsBatchSizePerRpc;
|
||||
}
|
||||
|
||||
public boolean isEnableHmsEventsIncrementalSync() {
|
||||
return enableHmsEventsIncrementalSync;
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,9 +145,7 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {
|
||||
// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
|
||||
// else just remove `that` event's relevant partitions
|
||||
for (String partitionName : getAllPartitionNames()) {
|
||||
if (thatPartitionEvent instanceof AddPartitionEvent) {
|
||||
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
|
||||
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
|
||||
if (thatPartitionEvent instanceof DropPartitionEvent) {
|
||||
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,7 +28,7 @@ import java.util.List;
|
||||
*/
|
||||
public class IgnoredEvent extends MetastoreEvent {
|
||||
private IgnoredEvent(NotificationEvent event, String catalogName) {
|
||||
super(event, catalogName);
|
||||
super(event);
|
||||
}
|
||||
|
||||
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
|
||||
|
||||
@ -24,8 +24,6 @@ import org.apache.doris.common.DdlException;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -33,13 +31,11 @@ import java.util.List;
|
||||
* MetastoreEvent for INSERT event type
|
||||
*/
|
||||
public class InsertEvent extends MetastoreTableEvent {
|
||||
private final Table hmsTbl;
|
||||
|
||||
// for test
|
||||
public InsertEvent(long eventId, String catalogName, String dbName,
|
||||
String tblName) {
|
||||
super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT);
|
||||
this.hmsTbl = null;
|
||||
}
|
||||
|
||||
private InsertEvent(NotificationEvent event, String catalogName) {
|
||||
@ -47,14 +43,6 @@ public class InsertEvent extends MetastoreTableEvent {
|
||||
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
|
||||
Preconditions
|
||||
.checkNotNull(event.getMessage(), debugString("Event message is null"));
|
||||
try {
|
||||
InsertMessage insertMessage =
|
||||
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
|
||||
.getInsertMessage(event.getMessage());
|
||||
hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj());
|
||||
} catch (Exception ex) {
|
||||
throw new MetastoreNotificationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) {
|
||||
|
||||
@ -78,6 +78,18 @@ public abstract class MetastoreEvent {
|
||||
this.event = null;
|
||||
}
|
||||
|
||||
// for IgnoredEvent
|
||||
protected MetastoreEvent(NotificationEvent event) {
|
||||
this.event = event;
|
||||
this.metastoreNotificationEvent = event;
|
||||
this.eventId = -1;
|
||||
this.eventTime = -1L;
|
||||
this.catalogName = null;
|
||||
this.dbName = null;
|
||||
this.tblName = null;
|
||||
this.eventType = null;
|
||||
}
|
||||
|
||||
protected MetastoreEvent(NotificationEvent event, String catalogName) {
|
||||
this.event = event;
|
||||
// Some events that we don't care about, dbName may be empty
|
||||
|
||||
@ -46,6 +46,9 @@ public class MetastoreEventFactory implements EventFactory {
|
||||
String catalogName) {
|
||||
Preconditions.checkNotNull(event.getEventType());
|
||||
MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("catalogName = {}, Event = {}", catalogName, event.toString());
|
||||
}
|
||||
switch (metastoreEventType) {
|
||||
case CREATE_TABLE:
|
||||
return CreateTableEvent.getEvents(event, catalogName);
|
||||
|
||||
@ -115,6 +115,9 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
if (catalog instanceof HMSExternalCatalog) {
|
||||
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
|
||||
if (!hmsExternalCatalog.isEnableHmsEventsIncrementalSync()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog);
|
||||
if (!events.isEmpty()) {
|
||||
@ -125,6 +128,8 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
} catch (MetastoreNotificationFetchException e) {
|
||||
LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
|
||||
} catch (Exception ex) {
|
||||
hmsExternalCatalog.onRefreshCache(true);
|
||||
updateLastSyncedEventId(hmsExternalCatalog, -1);
|
||||
LOG.warn("Failed to process hive metastore [{}] events .",
|
||||
hmsExternalCatalog.getName(), ex);
|
||||
}
|
||||
@ -147,7 +152,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
response = getNextEventResponseForSlave(hmsExternalCatalog);
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
if (response == null || response.getEventsSize() == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return response.getEvents();
|
||||
@ -207,9 +212,15 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
return null;
|
||||
}
|
||||
|
||||
int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc();
|
||||
try {
|
||||
return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
|
||||
Config.hms_events_batch_size_per_rpc, null);
|
||||
NotificationEventResponse notificationEventResponse =
|
||||
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null);
|
||||
LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId = {},"
|
||||
+ "batchSize = {}, getEventsSize = {}", hmsExternalCatalog.getName(), lastSyncedEventId,
|
||||
currentEventId, batchSize, notificationEventResponse.getEvents().size());
|
||||
|
||||
return notificationEventResponse;
|
||||
} catch (MetastoreNotificationFetchException e) {
|
||||
// Need a fallback to handle this because this error state can not be recovered until restarting FE
|
||||
if (StringUtils.isNotEmpty(e.getMessage())
|
||||
|
||||
@ -90,7 +90,7 @@ public class MetaCache<T> {
|
||||
return name == null ? Optional.empty() : getMetaObj(name, id);
|
||||
}
|
||||
|
||||
public void updateCache(String objName, T obj) {
|
||||
public void updateCache(String objName, T obj, long id) {
|
||||
metaObjCache.put(objName, Optional.of(obj));
|
||||
namesCache.asMap().compute("", (k, v) -> {
|
||||
if (v == null) {
|
||||
@ -100,9 +100,10 @@ public class MetaCache<T> {
|
||||
return v;
|
||||
}
|
||||
});
|
||||
idToName.put(id, objName);
|
||||
}
|
||||
|
||||
public void invalidate(String objName) {
|
||||
public void invalidate(String objName, long id) {
|
||||
namesCache.asMap().compute("", (k, v) -> {
|
||||
if (v == null) {
|
||||
return Lists.newArrayList();
|
||||
@ -112,11 +113,13 @@ public class MetaCache<T> {
|
||||
}
|
||||
});
|
||||
metaObjCache.invalidate(objName);
|
||||
idToName.remove(id);
|
||||
}
|
||||
|
||||
public void invalidateAll() {
|
||||
namesCache.invalidateAll();
|
||||
metaObjCache.invalidateAll();
|
||||
idToName.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -28,5 +28,6 @@ public class HMSProperties {
|
||||
// required
|
||||
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
|
||||
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);
|
||||
|
||||
public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
|
||||
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user