[feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
Poll metastore for create/alter/drop operations on database, table, partition events at a given frequency. By observing such events, we can take appropriate action on the (refresh/invalidate/add/remove) so that represents the latest information available in metastore. We keep track of the last synced event id in each polling iteration so the next batch can be requested appropriately.
This commit is contained in:
@ -214,4 +214,6 @@ public interface DatabaseIf<T extends TableIf> {
|
||||
}
|
||||
return (OlapTable) table;
|
||||
}
|
||||
|
||||
void dropTable(String tableName);
|
||||
}
|
||||
|
||||
@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.EsExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalMetaCacheMgr;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
|
||||
import org.apache.doris.deploy.DeployManager;
|
||||
import org.apache.doris.deploy.impl.AmbariDeployManager;
|
||||
import org.apache.doris.deploy.impl.K8sDeployManager;
|
||||
@ -316,6 +317,7 @@ public class Env {
|
||||
private DeleteHandler deleteHandler;
|
||||
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
|
||||
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
|
||||
private MetastoreEventsProcessor metastoreEventsProcessor;
|
||||
|
||||
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
|
||||
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
|
||||
@ -549,6 +551,7 @@ public class Env {
|
||||
this.deleteHandler = new DeleteHandler();
|
||||
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
|
||||
this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
|
||||
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
|
||||
|
||||
this.replayedJournalId = new AtomicLong(0L);
|
||||
this.isElectable = false;
|
||||
@ -1402,6 +1405,10 @@ public class Env {
|
||||
if (Config.enable_fqdn_mode) {
|
||||
fqdnManager.start();
|
||||
}
|
||||
if (Config.enable_hms_events_incremental_sync) {
|
||||
metastoreEventsProcessor.start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// start threads that should running on all FE
|
||||
|
||||
@ -258,4 +258,9 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
|
||||
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {}
|
||||
|
||||
@Override
|
||||
public void dropTable(String tableName) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,4 +170,14 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
|
||||
idToTbl.put(tbl.getId(), tbl);
|
||||
tableNameToId.put(tbl.getName(), tbl.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropTable(String tableName) {
|
||||
LOG.debug("drop table [{}]", tableName);
|
||||
Long tableId = tableNameToId.remove(tableName);
|
||||
if (tableId == null) {
|
||||
LOG.warn("drop table [{}] failed", tableName);
|
||||
}
|
||||
idToTbl.remove(tableId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1935,5 +1935,23 @@ public class Config extends ConfigBase {
|
||||
*/
|
||||
@ConfField(mutable = true)
|
||||
public static boolean enable_func_pushdown = true;
|
||||
|
||||
/**
|
||||
* If set to true, doris will automatically synchronize hms metadata to the cache in fe.
|
||||
*/
|
||||
@ConfField(masterOnly = true)
|
||||
public static boolean enable_hms_events_incremental_sync = false;
|
||||
|
||||
/**
|
||||
* Maximum number of events to poll in each RPC.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int hms_events_batch_size_per_rpc = 500;
|
||||
|
||||
/**
|
||||
* HMS polling interval in milliseconds.
|
||||
*/
|
||||
@ConfField(masterOnly = true)
|
||||
public static int hms_events_polling_interval_ms = 20000;
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.Resource.ReferenceType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
@ -439,13 +440,17 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
* Refresh the catalog meta and write the meta log.
|
||||
*/
|
||||
public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
|
||||
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
refreshCatalog(log);
|
||||
}
|
||||
|
||||
public void refreshCatalog(CatalogLog log) {
|
||||
writeLock();
|
||||
try {
|
||||
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
|
||||
}
|
||||
CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
|
||||
replayRefreshCatalog(log);
|
||||
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log);
|
||||
} finally {
|
||||
@ -481,7 +486,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
/**
|
||||
* Reply for refresh catalog event.
|
||||
*/
|
||||
public void replayRefreshCatalog(CatalogLog log) throws DdlException {
|
||||
public void replayRefreshCatalog(CatalogLog log) {
|
||||
writeLock();
|
||||
try {
|
||||
unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache());
|
||||
@ -554,6 +559,42 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
}
|
||||
|
||||
public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
}
|
||||
if (!(catalog instanceof ExternalCatalog)) {
|
||||
throw new DdlException("Only support drop ExternalCatalog Tables");
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
replayDropExternalTable(log);
|
||||
Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
|
||||
}
|
||||
|
||||
public void replayDropExternalTable(ExternalObjectLog log) {
|
||||
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
|
||||
log.getTableId());
|
||||
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
|
||||
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
|
||||
ExternalTable table = db.getTableForReplay(log.getTableId());
|
||||
db.dropTable(table.getName());
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
|
||||
@ -295,6 +295,9 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
db.setTableExtCatalog(this);
|
||||
}
|
||||
objectCreated = false;
|
||||
if (this instanceof HMSExternalCatalog) {
|
||||
((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
|
||||
}
|
||||
}
|
||||
|
||||
public void addDatabaseForTest(ExternalDatabase db) {
|
||||
|
||||
@ -25,14 +25,20 @@ import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.catalog.external.HMSExternalDatabase;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -42,8 +48,12 @@ import java.util.Map;
|
||||
* External catalog for hive metastore compatible data sources.
|
||||
*/
|
||||
public class HMSExternalCatalog extends ExternalCatalog {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);
|
||||
|
||||
private static final int MAX_CLIENT_POOL_SIZE = 8;
|
||||
protected PooledHiveMetaStoreClient client;
|
||||
// Record the latest synced event id when processing hive events
|
||||
private long lastSyncedEventId;
|
||||
|
||||
/**
|
||||
* Default constructor for HMSExternalCatalog.
|
||||
@ -170,4 +180,48 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
return tmpSchema;
|
||||
}
|
||||
|
||||
public void setLastSyncedEventId(long lastSyncedEventId) {
|
||||
this.lastSyncedEventId = lastSyncedEventId;
|
||||
}
|
||||
|
||||
public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
|
||||
throws MetastoreNotificationFetchException {
|
||||
makeSureInitialized();
|
||||
if (lastSyncedEventId < 0) {
|
||||
lastSyncedEventId = getCurrentEventId();
|
||||
refreshCatalog(hmsExternalCatalog);
|
||||
LOG.info(
|
||||
"First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
|
||||
+ "lastSyncedEventId is [{}]",
|
||||
hmsExternalCatalog.getName(), lastSyncedEventId);
|
||||
return null;
|
||||
}
|
||||
|
||||
long currentEventId = getCurrentEventId();
|
||||
LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
|
||||
hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
|
||||
if (currentEventId == lastSyncedEventId) {
|
||||
LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
|
||||
return null;
|
||||
}
|
||||
return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
|
||||
}
|
||||
|
||||
private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
|
||||
CatalogLog log = new CatalogLog();
|
||||
log.setCatalogId(hmsExternalCatalog.getId());
|
||||
log.setInvalidCache(true);
|
||||
Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
|
||||
}
|
||||
|
||||
private long getCurrentEventId() {
|
||||
makeSureInitialized();
|
||||
CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId();
|
||||
if (currentNotificationEventId == null) {
|
||||
LOG.warn("Get currentNotificationEventId is null");
|
||||
return -1;
|
||||
}
|
||||
return currentNotificationEventId.getEventId();
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.HMSResource;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
|
||||
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -29,8 +30,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -145,6 +148,30 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
public CurrentNotificationEventId getCurrentNotificationEventId() {
|
||||
try (CachedClient client = getClient()) {
|
||||
return client.client.getCurrentNotificationEventId();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to fetch current notification event id", e);
|
||||
throw new MetastoreNotificationFetchException(
|
||||
"Failed to get current notification event id. msg: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public NotificationEventResponse getNextNotification(long lastEventId,
|
||||
int maxEvents,
|
||||
IMetaStoreClient.NotificationFilter filter)
|
||||
throws MetastoreNotificationFetchException {
|
||||
try (CachedClient client = getClient()) {
|
||||
return client.client.getNextNotification(lastEventId, maxEvents, filter);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
|
||||
throw new MetastoreNotificationFetchException(
|
||||
"Failed to get next notification based on last event id: " + lastEventId + ". msg: " + e
|
||||
.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private class CachedClient implements AutoCloseable {
|
||||
private final IMetaStoreClient client;
|
||||
|
||||
|
||||
@ -0,0 +1,89 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* MetastoreEvent for DROP_TABLE event type
|
||||
*/
|
||||
public class DropTableEvent extends MetastoreTableEvent {
|
||||
private static final Logger LOG = LogManager.getLogger(DropTableEvent.class);
|
||||
private final String dbName;
|
||||
private final String tableName;
|
||||
|
||||
private DropTableEvent(NotificationEvent event,
|
||||
String catalogName) {
|
||||
super(event, catalogName);
|
||||
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
|
||||
JSONDropTableMessage dropTableMessage =
|
||||
(JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
|
||||
.getDropTableMessage(event.getMessage());
|
||||
try {
|
||||
dbName = dropTableMessage.getDB();
|
||||
tableName = dropTableMessage.getTable();
|
||||
} catch (Exception e) {
|
||||
throw new MetastoreNotificationException(debugString(
|
||||
"Could not parse event message. "
|
||||
+ "Check if %s is set to true in metastore configuration",
|
||||
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<MetastoreEvent> getEvents(NotificationEvent event,
|
||||
String catalogName) {
|
||||
return Lists.newArrayList(new DropTableEvent(event, catalogName));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean existInCache() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canBeSkipped() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected boolean isSupported() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process() throws MetastoreNotificationException {
|
||||
try {
|
||||
LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
|
||||
tableName);
|
||||
Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
|
||||
catalogName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Factory interface to generate a {@link MetastoreEvent} from a {@link NotificationEvent} object.
|
||||
*/
|
||||
public interface EventFactory {
|
||||
|
||||
List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent hmsEvent,
|
||||
String catalogName) throws MetastoreNotificationException;
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An event type which is ignored. Useful for unsupported metastore event types
|
||||
*/
|
||||
public class IgnoredEvent extends MetastoreEvent {
|
||||
protected IgnoredEvent(NotificationEvent event, String catalogName) {
|
||||
super(event, catalogName);
|
||||
}
|
||||
|
||||
private static List<MetastoreEvent> getEvents(NotificationEvent event,
|
||||
String catalogName) {
|
||||
return Lists.newArrayList(new IgnoredEvent(event, catalogName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,203 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to
|
||||
* process a NotificationEvent received from metastore.
|
||||
*/
|
||||
public abstract class MetastoreEvent {
|
||||
private static final Logger LOG = LogManager.getLogger(MetastoreEvent.class);
|
||||
// String.format compatible string to prepend event id and type
|
||||
private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s ";
|
||||
|
||||
// logger format compatible string to prepend to a log formatted message
|
||||
private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} ";
|
||||
|
||||
// the notification received from metastore which is processed by this
|
||||
protected final NotificationEvent event;
|
||||
|
||||
// dbName from the event
|
||||
protected final String dbName;
|
||||
|
||||
// tblName from the event
|
||||
protected final String tblName;
|
||||
|
||||
// eventId of the event. Used instead of calling getter on event everytime
|
||||
private final long eventId;
|
||||
|
||||
// eventType from the NotificationEvent
|
||||
private final MetastoreEventType eventType;
|
||||
|
||||
// Actual notificationEvent object received from Metastore
|
||||
protected final NotificationEvent metastoreNotificationEvent;
|
||||
|
||||
protected final String catalogName;
|
||||
|
||||
protected MetastoreEvent(NotificationEvent event, String catalogName) {
|
||||
this.event = event;
|
||||
this.dbName = event.getDbName();
|
||||
this.tblName = event.getTableName();
|
||||
this.eventId = event.getEventId();
|
||||
this.eventType = MetastoreEventType.from(event.getEventType());
|
||||
this.metastoreNotificationEvent = event;
|
||||
this.catalogName = catalogName;
|
||||
}
|
||||
|
||||
public long getEventId() {
|
||||
return eventId;
|
||||
}
|
||||
|
||||
public MetastoreEventType getEventType() {
|
||||
return eventType;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public String getTblName() {
|
||||
return tblName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given event can be batched into this event. Default behavior is
|
||||
* to return false which can be overridden by a sub-class.
|
||||
* The current version is relatively simple to process batch events, so all that need to be processed are true.
|
||||
*
|
||||
* @param event The event under consideration to be batched into this event.
|
||||
* @return false if event cannot be batched into this event; otherwise true.
|
||||
*/
|
||||
protected boolean canBeBatched(MetastoreEvent event) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given event into the batch of events represented by this event. Default
|
||||
* implementation is to return null. Sub-classes must override this method to
|
||||
* implement batching.
|
||||
*
|
||||
* @param event The event which needs to be added to the batch.
|
||||
* @return The batch event which represents all the events batched into this event
|
||||
* until now including the given event.
|
||||
*/
|
||||
protected MetastoreEvent addToBatchEvents(MetastoreEvent event) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
protected boolean existInCache() throws MetastoreNotificationException {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of events represented by this event. For most events this is 1.
|
||||
* In case of batch events this could be more than 1.
|
||||
*/
|
||||
protected int getNumberOfEvents() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore
|
||||
* some events because they do not affect query results.
|
||||
*
|
||||
* @return true if this event can be skipped.
|
||||
*/
|
||||
protected boolean canBeSkipped() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the current version of FE supports processing of some events, some events are reserved,
|
||||
* and may be processed later version.
|
||||
*/
|
||||
protected boolean isSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the information available in the NotificationEvent.
|
||||
*/
|
||||
protected abstract void process() throws MetastoreNotificationException;
|
||||
|
||||
/**
|
||||
* Helper method to get debug string with helpful event information prepended to the
|
||||
* message. This can be used to generate helpful exception messages
|
||||
*
|
||||
* @param msgFormatString String value to be used in String.format() for the given message
|
||||
* @param args args to the <code>String.format()</code> for the given msgFormatString
|
||||
*/
|
||||
protected String debugString(String msgFormatString, Object... args) {
|
||||
String formatString = STR_FORMAT_EVENT_ID_TYPE + msgFormatString;
|
||||
Object[] formatArgs = getLogFormatArgs(args);
|
||||
return String.format(formatString, formatArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to generate the format args after prepending the event id and type
|
||||
*/
|
||||
private Object[] getLogFormatArgs(Object[] args) {
|
||||
Object[] formatArgs = new Object[args.length + 2];
|
||||
formatArgs[0] = getEventId();
|
||||
formatArgs[1] = getEventType();
|
||||
int i = 2;
|
||||
for (Object arg : args) {
|
||||
formatArgs[i] = arg;
|
||||
i++;
|
||||
}
|
||||
return formatArgs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs at info level the given log formatted string and its args. The log formatted
|
||||
* string should have {} pair at the appropriate location in the string for each arg
|
||||
* value provided. This method prepends the event id and event type before logging the
|
||||
* message. No-op if the log level is not at INFO
|
||||
*/
|
||||
protected void infoLog(String logFormattedStr, Object... args) {
|
||||
if (!LOG.isInfoEnabled()) {
|
||||
return;
|
||||
}
|
||||
String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
|
||||
Object[] formatArgs = getLogFormatArgs(args);
|
||||
LOG.info(formatString, formatArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to infoLog excepts logs at debug level
|
||||
*/
|
||||
protected void debugLog(String logFormattedStr, Object... args) {
|
||||
if (!LOG.isDebugEnabled()) {
|
||||
return;
|
||||
}
|
||||
String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
|
||||
Object[] formatArgs = getLogFormatArgs(args);
|
||||
LOG.debug(formatString, formatArgs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,81 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Factory class to create various MetastoreEvents.
|
||||
*/
|
||||
public class MetastoreEventFactory implements EventFactory {
|
||||
private static final Logger LOG = LogManager.getLogger(MetastoreEventFactory.class);
|
||||
|
||||
@Override
|
||||
public List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent event,
|
||||
String catalogName) {
|
||||
Preconditions.checkNotNull(event.getEventType());
|
||||
MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
|
||||
switch (metastoreEventType) {
|
||||
case DROP_TABLE:
|
||||
return DropTableEvent.getEvents(event, catalogName);
|
||||
default:
|
||||
// ignore all the unknown events by creating a IgnoredEvent
|
||||
return Lists.newArrayList(new IgnoredEvent(event, catalogName));
|
||||
}
|
||||
}
|
||||
|
||||
List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
|
||||
List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
|
||||
|
||||
for (NotificationEvent event : events) {
|
||||
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName()));
|
||||
}
|
||||
|
||||
List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
|
||||
.filter(MetastoreEvent::isSupported)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (tobeProcessEvents.isEmpty()) {
|
||||
LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return createBatchEvents(tobeProcessEvents);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create batch event tasks according to HivePartitionName to facilitate subsequent parallel processing.
|
||||
* For ADD_PARTITION and DROP_PARTITION, we directly override any events before that partition.
|
||||
* For a partition, it is meaningless to process any events before the drop partition.
|
||||
*/
|
||||
List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
|
||||
// now do nothing
|
||||
return events;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,68 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
/**
|
||||
* Currently we only support handling some events.
|
||||
*/
|
||||
public enum MetastoreEventType {
|
||||
CREATE_TABLE("CREATE_TABLE"),
|
||||
DROP_TABLE("DROP_TABLE"),
|
||||
ALTER_TABLE("ALTER_TABLE"),
|
||||
CREATE_DATABASE("CREATE_DATABASE"),
|
||||
DROP_DATABASE("DROP_DATABASE"),
|
||||
ALTER_DATABASE("ALTER_DATABASE"),
|
||||
ADD_PARTITION("ADD_PARTITION"),
|
||||
ALTER_PARTITION("ALTER_PARTITION"),
|
||||
ALTER_PARTITIONS("ALTER_PARTITIONS"),
|
||||
DROP_PARTITION("DROP_PARTITION"),
|
||||
INSERT("INSERT"),
|
||||
INSERT_PARTITIONS("INSERT_PARTITIONS"),
|
||||
ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
|
||||
COMMIT_TXN("COMMIT_TXN"),
|
||||
ABORT_TXN("ABORT_TXN"),
|
||||
OTHER("OTHER");
|
||||
|
||||
private final String eventType;
|
||||
|
||||
MetastoreEventType(String msEventType) {
|
||||
this.eventType = msEventType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return eventType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the MetastoreEventType from a given string value of event from Metastore's
|
||||
* NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
|
||||
* return OTHER
|
||||
*
|
||||
* @param eventType EventType value from the {@link org.apache.hadoop.hive.metastore.api.NotificationEvent}
|
||||
*/
|
||||
public static MetastoreEventType from(String eventType) {
|
||||
for (MetastoreEventType metastoreEventType : values()) {
|
||||
if (metastoreEventType.eventType.equalsIgnoreCase(eventType)) {
|
||||
return metastoreEventType;
|
||||
}
|
||||
}
|
||||
return OTHER;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,151 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
|
||||
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A metastore event is a instance of the class
|
||||
* {@link NotificationEvent}. Metastore can be
|
||||
* configured, to work with Listeners which are called on various DDL operations like
|
||||
* create/alter/drop operations on database, table, partition etc. Each event has a unique
|
||||
* incremental id and the generated events are be fetched from Metastore to get
|
||||
* incremental updates to the metadata stored in Hive metastore using the the public API
|
||||
* <code>get_next_notification</code> These events could be generated by external
|
||||
* Metastore clients like Apache Hive or Apache Spark configured to talk with the same metastore.
|
||||
* <p>
|
||||
* This class is used to poll metastore for such events at a given frequency. By observing
|
||||
* such events, we can take appropriate action on the {@link org.apache.doris.datasource.hive.HiveMetaStoreCache}
|
||||
* (refresh/invalidate/add/remove) so that represents the latest information
|
||||
* available in metastore. We keep track of the last synced event id in each polling
|
||||
* iteration so the next batch can be requested appropriately. The current batch size is
|
||||
* constant and set to {@link org.apache.doris.common.Config#hms_events_batch_size_per_rpc}.
|
||||
*/
|
||||
public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(MetastoreEventsProcessor.class);
|
||||
public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
|
||||
"hive.metastore.notifications.add.thrift.objects";
|
||||
|
||||
// for deserializing from JSON strings from metastore event
|
||||
private static final MessageDeserializer MESSAGE_DESERIALIZER = new JSONMessageDeserializer();
|
||||
|
||||
|
||||
// event factory which is used to get or create MetastoreEvents
|
||||
private final MetastoreEventFactory metastoreEventFactory;
|
||||
|
||||
private boolean isRunning;
|
||||
|
||||
public MetastoreEventsProcessor() {
|
||||
super(MetastoreEventsProcessor.class.getName(), Config.hms_events_polling_interval_ms);
|
||||
this.metastoreEventFactory = new MetastoreEventFactory();
|
||||
this.isRunning = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the next batch of NotificationEvents from metastore. The default batch size is
|
||||
* <code>{@link Config#hms_events_batch_size_per_rpc}</code>
|
||||
*/
|
||||
private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) {
|
||||
LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName());
|
||||
NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
|
||||
|
||||
if (response == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return response.getEvents();
|
||||
}
|
||||
|
||||
private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) {
|
||||
for (MetastoreEvent event : events) {
|
||||
try {
|
||||
event.process();
|
||||
} catch (Exception e) {
|
||||
hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the given list of notification events. Useful for tests which provide a list of events
|
||||
*/
|
||||
private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
|
||||
//transfer
|
||||
List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
|
||||
doExecute(metastoreEvents, hmsExternalCatalog);
|
||||
hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
if (isRunning) {
|
||||
LOG.warn("Last task not finished,ignore current task.");
|
||||
return;
|
||||
}
|
||||
isRunning = true;
|
||||
try {
|
||||
realRun();
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Task failed", ex);
|
||||
}
|
||||
isRunning = false;
|
||||
}
|
||||
|
||||
private void realRun() {
|
||||
List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
|
||||
for (Long catalogId : catalogIds) {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
if (catalog instanceof HMSExternalCatalog) {
|
||||
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
|
||||
List<NotificationEvent> events = Collections.emptyList();
|
||||
try {
|
||||
events = getNextHMSEvents(hmsExternalCatalog);
|
||||
if (!events.isEmpty()) {
|
||||
LOG.info("Events size are {} on catalog [{}]", events.size(),
|
||||
hmsExternalCatalog.getName());
|
||||
processEvents(events, hmsExternalCatalog);
|
||||
}
|
||||
} catch (MetastoreNotificationFetchException e) {
|
||||
LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to process hive metastore [{}] events .",
|
||||
hmsExternalCatalog.getName(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static MessageDeserializer getMessageDeserializer() {
|
||||
return MESSAGE_DESERIALIZER;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
/**
|
||||
* Utility exception class to be thrown for errors during event processing
|
||||
*/
|
||||
public class MetastoreNotificationException extends RuntimeException {
|
||||
|
||||
public MetastoreNotificationException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
|
||||
public MetastoreNotificationException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
public MetastoreNotificationException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
/**
|
||||
* Utility exception class to be thrown for errors during event processing
|
||||
*/
|
||||
public class MetastoreNotificationFetchException extends MetastoreNotificationException {
|
||||
|
||||
public MetastoreNotificationFetchException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
|
||||
public MetastoreNotificationFetchException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
public MetastoreNotificationFetchException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,50 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.datasource.hive.event;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Base class for all the table events
|
||||
*/
|
||||
public abstract class MetastoreTableEvent extends MetastoreEvent {
|
||||
|
||||
|
||||
protected MetastoreTableEvent(NotificationEvent event, String catalogName) {
|
||||
super(event, catalogName);
|
||||
Preconditions.checkNotNull(dbName, "Database name cannot be null");
|
||||
Preconditions.checkNotNull(tblName, "Table name cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of parameters that are set by Hive for tables/partitions that can be
|
||||
* ignored to determine if the alter table/partition event is a trivial one.
|
||||
*/
|
||||
private static final List<String> PARAMETERS_TO_IGNORE =
|
||||
new ImmutableList.Builder<String>()
|
||||
.add("transient_lastDdlTime")
|
||||
.add("numFilesErasureCoded")
|
||||
.add("numFiles")
|
||||
.add("comment")
|
||||
.build();
|
||||
}
|
||||
@ -706,6 +706,7 @@ public class JournalEntity implements Writable {
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_REFRESH_EXTERNAL_DB:
|
||||
case OperationType.OP_DROP_EXTERNAL_TABLE:
|
||||
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
|
||||
data = ExternalObjectLog.read(in);
|
||||
isRead = true;
|
||||
|
||||
@ -949,6 +949,11 @@ public class EditLog {
|
||||
env.getCatalogMgr().replayRefreshExternalTable(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_DROP_EXTERNAL_TABLE: {
|
||||
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
|
||||
env.getCatalogMgr().replayDropExternalTable(log);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_INIT_EXTERNAL_TABLE: {
|
||||
// Do nothing.
|
||||
break;
|
||||
@ -1624,6 +1629,10 @@ public class EditLog {
|
||||
logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
|
||||
}
|
||||
|
||||
public void logDropExternalTable(ExternalObjectLog log) {
|
||||
logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
|
||||
}
|
||||
|
||||
public Journal getJournal() {
|
||||
return this.journal;
|
||||
}
|
||||
|
||||
@ -255,6 +255,8 @@ public class OperationType {
|
||||
public static final short OP_DROP_MTMV_TASK = 341;
|
||||
public static final short OP_ALTER_MTMV_TASK = 342;
|
||||
|
||||
public static final short OP_DROP_EXTERNAL_TABLE = 350;
|
||||
|
||||
public static final short OP_ALTER_USER = 400;
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user