[feature](multi-catalog) support hive metastore more events (#15702)

support hive metastore more events
This commit is contained in:
zhangdong
2023-01-16 14:16:12 +08:00
committed by GitHub
parent fa03c8a241
commit 899f5f5cf5
27 changed files with 1572 additions and 121 deletions

View File

@ -56,7 +56,7 @@ public class RefreshManager {
refreshInternalCtlIcebergTable(stmt, env);
} else {
// Process external catalog table refresh
refreshExternalCtlTable(dbName, tableName, catalog);
env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName);
}
LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName);
}
@ -146,25 +146,4 @@ public class RefreshManager {
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
private void refreshExternalCtlTable(String dbName, String tableName, CatalogIf catalog) throws DdlException {
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support refresh ExternalCatalog Tables");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
}

View File

@ -263,4 +263,8 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
public void dropTable(String tableName) {
throw new NotImplementedException();
}
public void createTable(String tableName, long tableId) {
throw new NotImplementedException();
}
}

View File

@ -174,10 +174,21 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
makeSureInitialized();
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
LOG.warn("drop table [{}] failed", tableName);
}
idToTbl.remove(tableId);
}
@Override
public void createTable(String tableName, long tableId) {
LOG.debug("create table [{}]", tableName);
makeSureInitialized();
tableNameToId.put(tableName, tableId);
HMSExternalTable table = new HMSExternalTable(tableId, tableName, name,
(HMSExternalCatalog) extCatalog);
idToTbl.put(tableId, table);
}
}

View File

@ -1953,6 +1953,6 @@ public class Config extends ConfigBase {
* HMS polling interval in milliseconds.
*/
@ConfField(masterOnly = true)
public static int hms_events_polling_interval_ms = 20000;
public static int hms_events_polling_interval_ms = 10000;
}

View File

@ -553,10 +553,47 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
}
}
public void refreshExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support refresh ExternalCatalog Tables");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
public void replayRefreshExternalTable(ExternalObjectLog log) {
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}
@ -590,13 +627,321 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
db.dropTable(table.getName());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
db.writeLock();
try {
db.dropTable(table.getName());
} finally {
db.writeUnlock();
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}
public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support ExternalCatalog Tables");
}
return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName);
}
public void createExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support create ExternalCatalog Tables");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table != null) {
throw new DdlException("Table " + tableName + " has exist in db " + dbName);
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableName(tableName);
log.setTableId(Env.getCurrentEnv().getNextId());
replayCreateExternalTable(log);
Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
}
public void replayCreateExternalTable(ExternalObjectLog log) {
LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId(), log.getTableName());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
db.writeLock();
try {
db.createTable(log.getTableName(), log.getTableId());
} finally {
db.writeUnlock();
}
}
public void dropExternalDatabase(String dbName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support drop ExternalCatalog databases");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setInvalidCache(true);
replayDropExternalDatabase(log);
Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log);
}
public void replayDropExternalDatabase(ExternalObjectLog log) {
writeLock();
try {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
catalog.dropDatabase(db.getFullName());
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), db.getFullName());
} finally {
writeUnlock();
}
}
public void createExternalDatabase(String dbName, String catalogName) throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support create ExternalCatalog databases");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db != null) {
throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName());
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(Env.getCurrentEnv().getNextId());
log.setDbName(dbName);
replayCreateExternalDatabase(log);
Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log);
}
public void replayCreateExternalDatabase(ExternalObjectLog log) {
writeLock();
try {
LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", log.getCatalogId(),
log.getDbId(), log.getDbName());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
catalog.createDatabase(log.getDbId(), log.getDbName());
} finally {
writeUnlock();
}
}
public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support ExternalCatalog");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
replayAddExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
}
public void replayAddExternalPartitions(ExternalObjectLog log) {
LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.addPartitionsCache(catalog.getId(), table, log.getPartitionNames());
}
public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support ExternalCatalog");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
replayDropExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
}
public void replayDropExternalPartitions(ExternalObjectLog log) {
LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.dropPartitionsCache(catalog.getId(), table, log.getPartitionNames());
}
public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
List<String> partitionNames)
throws DdlException {
CatalogIf catalog = nameToCatalog.get(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support ExternalCatalog");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
}
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
log.setPartitionNames(partitionNames);
replayRefreshExternalPartitions(log);
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
}
public void replayRefreshExternalPartitions(ExternalObjectLog log) {
LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(),
log.getDbId(), log.getTableId());
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
if (catalog == null) {
LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId());
return;
}
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
if (db == null) {
LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId());
return;
}
ExternalTable table = db.getTableForReplay(log.getTableId());
if (table == null) {
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
return;
}
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
log.getPartitionNames());
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@ -98,6 +99,17 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
*/
public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName);
/**
* check if the specified table exist in doris.
*
* @param dbName
* @param tblName
* @return true if table exists, false otherwise
*/
public boolean tableExistInLocal(String dbName, String tblName) {
throw new NotImplementedException();
}
/**
* Catalog can't be init when creating because the external catalog may depend on third system.
* So you have to make sure the client of third system is initialized before any method was called.
@ -310,4 +322,12 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
public void dropDatabase(String dbName) {
throw new NotImplementedException();
}
public void createDatabase(long dbId, String dbName) {
throw new NotImplementedException();
}
}

View File

@ -17,6 +17,8 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@ -25,6 +27,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@ -118,4 +121,45 @@ public class ExternalMetaCacheMgr {
}
LOG.debug("invalid catalog cache for {}", catalogId);
}
public void addPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
if (metaCache != null) {
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames,
((HMSExternalTable) table).getPartitionColumnTypes());
}
LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
}
public void dropPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
return;
}
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
if (metaCache != null) {
metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames,
((HMSExternalTable) table).getPartitionColumnTypes(), true);
}
LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
}
public void invalidatePartitionsCache(long catalogId, String dbName, String tableName,
List<String> partitionNames) {
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
if (metaCache != null) {
dbName = ClusterNamespace.getNameFromFullName(dbName);
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dbName, tableName, partitionName);
}
}
LOG.debug("invalidate partition cache for {}.{} in catalog {}", dbName, tableName, catalogId);
}
}

View File

@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@NoArgsConstructor
@Getter
@ -40,12 +41,21 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "dbName")
private String dbName;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "tableName")
private String tableName;
@SerializedName(value = "invalidCache")
private boolean invalidCache;
@SerializedName(value = "partitionNames")
private List<String> partitionNames;
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));

View File

@ -154,6 +154,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
return client.tableExists(getRealTableName(dbName), tblName);
}
@Override
public boolean tableExistInLocal(String dbName, String tblName) {
makeSureInitialized();
HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName));
if (hmsExternalDatabase == null) {
return false;
}
return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
}
public PooledHiveMetaStoreClient getClient() {
makeSureInitialized();
return client;
@ -215,4 +225,24 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
return currentNotificationEventId.getEventId();
}
@Override
public void dropDatabase(String dbName) {
LOG.debug("drop database [{}]", dbName);
makeSureInitialized();
Long dbId = dbNameToId.remove(dbName);
if (dbId == null) {
LOG.warn("drop database [{}] failed", dbName);
}
idToDb.remove(dbId);
}
@Override
public void createDatabase(long dbId, String dbName) {
makeSureInitialized();
LOG.debug("create database [{}]", dbName);
dbNameToId.put(dbName, dbId);
HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
idToDb.put(dbId, db);
}
}

View File

@ -45,6 +45,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -57,6 +58,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -163,27 +165,36 @@ public class HiveMetaStoreCache {
LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName());
}
Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size());
Map<String, Long> partitionNameToIdMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
long idx = 0;
for (String partitionName : partitionNames) {
idToPartitionItem.put(idx++, toListPartitionItem(partitionName, key.types));
long partitionId = idx++;
ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types);
idToPartitionItem.put(partitionId, listPartitionItem);
partitionNameToIdMap.put(partitionName, partitionId);
}
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
Map<Range<PartitionKey>, UniqueId> rangeToId = null;
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
if (key.types.size() > 1) {
// uidToPartitionRange and rangeToId are only used for multi-column partition
uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem);
uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap);
rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
} else {
Preconditions.checkState(key.types.size() == 1, key.types);
// singleColumnRangeMap is only used for single-column partition
singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem);
singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap);
singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
}
return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap);
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap, idx,
partitionNameToIdMap, idToUniqueIdsMap, singleUidToColumnRangeMap, partitionValuesMap);
}
private ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) {
public ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) {
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
String[] parts = partitionName.split("/");
@ -274,6 +285,10 @@ public class HiveMetaStoreCache {
public HivePartitionValues getPartitionValues(String dbName, String tblName, List<Type> types) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types);
return getPartitionValues(key);
}
public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
try {
return partitionValuesCache.get(key);
} catch (ExecutionException e) {
@ -350,6 +365,21 @@ public class HiveMetaStoreCache {
}
}
public void invalidatePartitionCache(String dbName, String tblName, String partitionName) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
if (partitionValues != null) {
Long partitionId = partitionValues.partitionNameToIdMap.get(partitionName);
List<String> values = partitionValues.partitionValuesMap.get(partitionId);
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCache.invalidate(new FileCacheKey(partition.getPath(), null));
partitionCache.invalidate(partKey);
}
}
}
public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys = partitionValuesCache.asMap().keySet();
@ -369,6 +399,113 @@ public class HiveMetaStoreCache {
LOG.debug("invalid all meta cache in catalog {}", catalog.getName());
}
// partition name format: nation=cn/city=beijing
public void addPartitionsCache(String dbName, String tblName, List<String> partitionNames,
List<Type> partitionColumnTypes) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes);
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
if (partitionValues == null) {
return;
}
HivePartitionValues copy = partitionValues.copy();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, List<UniqueId>> idToUniqueIdsMap = copy.getIdToUniqueIdsMap();
Map<Long, PartitionItem> idToPartitionItem = new HashMap<>();
long idx = copy.getNextPartitionId();
for (String partitionName : partitionNames) {
if (partitionNameToIdMapBefore.containsKey(partitionName)) {
LOG.info("addPartitionsCache partitionName:[{}] has exist in table:[{}]", partitionName, tblName);
continue;
}
long partitionId = idx++;
ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types);
idToPartitionItemBefore.put(partitionId, listPartitionItem);
idToPartitionItem.put(partitionId, listPartitionItem);
partitionNameToIdMapBefore.put(partitionName, partitionId);
}
Map<Long, List<String>> partitionValuesMapBefore = copy.getPartitionValuesMap();
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
partitionValuesMapBefore.putAll(partitionValuesMap);
copy.setNextPartitionId(idx);
if (key.types.size() > 1) {
Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange();
// uidToPartitionRange and rangeToId are only used for multi-column partition
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = ListPartitionPrunerV2
.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap);
uidToPartitionRangeBefore.putAll(uidToPartitionRange);
Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId();
Map<Range<PartitionKey>, UniqueId> rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
rangeToIdBefore.putAll(rangeToId);
} else {
Preconditions.checkState(key.types.size() == 1, key.types);
// singleColumnRangeMap is only used for single-column partition
RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap();
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = ListPartitionPrunerV2
.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap);
singleColumnRangeMapBefore.putAll(singleColumnRangeMap);
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy
.getSingleUidToColumnRangeMap();
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = ListPartitionPrunerV2
.genSingleUidToColumnRange(singleColumnRangeMap);
singleUidToColumnRangeMapBefore.putAll(singleUidToColumnRangeMap);
}
HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesCache.put(key, copy);
}
}
public void dropPartitionsCache(String dbName, String tblName, List<String> partitionNames,
List<Type> partitionColumnTypes, boolean invalidPartitionCache) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes);
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
if (partitionValues == null) {
return;
}
HivePartitionValues copy = partitionValues.copy();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<Long, List<UniqueId>> idToUniqueIdsMapBefore = copy.getIdToUniqueIdsMap();
Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange();
Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId();
RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap();
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy.getSingleUidToColumnRangeMap();
Map<Long, List<String>> partitionValuesMap = copy.getPartitionValuesMap();
for (String partitionName : partitionNames) {
if (!partitionNameToIdMapBefore.containsKey(partitionName)) {
LOG.info("dropPartitionsCache partitionName:[{}] not exist in table:[{}]", partitionName, tblName);
continue;
}
Long partitionId = partitionNameToIdMapBefore.remove(partitionName);
idToPartitionItemBefore.remove(partitionId);
partitionValuesMap.remove(partitionId);
List<UniqueId> uniqueIds = idToUniqueIdsMapBefore.remove(partitionId);
if (key.types.size() > 1) {
for (UniqueId uniqueId : uniqueIds) {
Range<PartitionKey> range = uidToPartitionRangeBefore.remove(uniqueId);
rangeToIdBefore.remove(range);
}
} else {
for (UniqueId uniqueId : uniqueIds) {
Range<ColumnBound> range = singleUidToColumnRangeMapBefore.remove(uniqueId);
singleColumnRangeMapBefore.remove(range);
}
}
if (invalidPartitionCache) {
invalidatePartitionCache(dbName, tblName, partitionName);
}
}
HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesCache.put(key, copy);
}
}
public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, HivePartitionValues values) {
partitionValuesCache.put(key, values);
}
/**
* The Key of hive partition value cache
*/
@ -480,24 +617,58 @@ public class HiveMetaStoreCache {
@Data
public static class HivePartitionValues {
private long nextPartitionId;
private Map<String, Long> partitionNameToIdMap;
private Map<Long, List<UniqueId>> idToUniqueIdsMap;
private Map<Long, PartitionItem> idToPartitionItem;
private Map<Long, List<String>> partitionValuesMap = Maps.newHashMap();
private Map<Long, List<String>> partitionValuesMap;
//multi pair
private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
private Map<Range<PartitionKey>, UniqueId> rangeToId;
//single pair
private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;
public HivePartitionValues() {
}
public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange,
Map<Range<PartitionKey>, UniqueId> rangeToId,
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
long nextPartitionId,
Map<String, Long> partitionNameToIdMap,
Map<Long, List<UniqueId>> idToUniqueIdsMap,
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap,
Map<Long, List<String>> partitionValuesMap) {
this.idToPartitionItem = idToPartitionItem;
for (Map.Entry<Long, PartitionItem> entry : this.idToPartitionItem.entrySet()) {
partitionValuesMap.put(entry.getKey(),
((ListPartitionItem) entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
}
this.uidToPartitionRange = uidToPartitionRange;
this.rangeToId = rangeToId;
this.singleColumnRangeMap = singleColumnRangeMap;
this.nextPartitionId = nextPartitionId;
this.partitionNameToIdMap = partitionNameToIdMap;
this.idToUniqueIdsMap = idToUniqueIdsMap;
this.singleUidToColumnRangeMap = singleUidToColumnRangeMap;
this.partitionValuesMap = partitionValuesMap;
}
public HivePartitionValues copy() {
HivePartitionValues copy = new HivePartitionValues();
copy.setNextPartitionId(nextPartitionId);
copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null : Maps.newHashMap(partitionNameToIdMap));
copy.setIdToUniqueIdsMap(idToUniqueIdsMap == null ? null : Maps.newHashMap(idToUniqueIdsMap));
copy.setIdToPartitionItem(idToPartitionItem == null ? null : Maps.newHashMap(idToPartitionItem));
copy.setPartitionValuesMap(partitionValuesMap == null ? null : Maps.newHashMap(partitionValuesMap));
copy.setUidToPartitionRange(uidToPartitionRange == null ? null : Maps.newHashMap(uidToPartitionRange));
copy.setRangeToId(rangeToId == null ? null : Maps.newHashMap(rangeToId));
copy.setSingleUidToColumnRangeMap(
singleUidToColumnRangeMap == null ? null : Maps.newHashMap(singleUidToColumnRangeMap));
if (singleColumnRangeMap != null) {
RangeMap<ColumnBound, UniqueId> copySingleColumnRangeMap = TreeRangeMap.create();
copySingleColumnRangeMap.putAll(singleColumnRangeMap);
copy.setSingleColumnRangeMap(copySingleColumnRangeMap);
}
return copy;
}
}
}

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* MetastoreEvent for ADD_PARTITION event type
*/
public class AddPartitionEvent extends MetastoreTableEvent {
private final Table hmsTbl;
private final List<String> partitionNames;
private AddPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ADD_PARTITION));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
AddPartitionMessage addPartitionMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getAddPartitionMessage(event.getMessage());
hmsTbl = Preconditions.checkNotNull(addPartitionMessage.getTableObj());
Iterable<Partition> addedPartitions = addPartitionMessage.getPartitionObjs();
partitionNames = new ArrayList<>();
List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
.map(FieldSchema::getName).collect(Collectors.toList());
addedPartitions.forEach(partition -> partitionNames.add(
FileUtils.makePartName(partitionColNames, partition.getValues())));
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AddPartitionEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", catalogName, dbName, tblName,
partitionNames.toString());
// bail out early if there are not partitions to process
if (partitionNames.isEmpty()) {
infoLog("Partition list is empty. Ignoring this event.");
return;
}
Env.getCurrentEnv().getCatalogMgr()
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
* MetastoreEvent for Alter_DATABASE event type
*/
public class AlterDatabaseEvent extends MetastoreEvent {
private AlterDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
// only can change properties,we do nothing
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
}
}

View File

@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import java.util.List;
import java.util.stream.Collectors;
/**
* MetastoreEvent for ALTER_PARTITION event type
*/
public class AlterPartitionEvent extends MetastoreTableEvent {
private final Table hmsTbl;
private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter;
private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore;
private final String partitionNameBefore;
private final String partitionNameAfter;
// true if this alter event was due to a rename operation
private final boolean isRename;
private AlterPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_PARTITION));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
AlterPartitionMessage alterPartitionMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getAlterPartitionMessage(event.getMessage());
hmsTbl = Preconditions.checkNotNull(alterPartitionMessage.getTableObj());
partitionBefore = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
partitionAfter = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
.map(FieldSchema::getName).collect(Collectors.toList());
partitionNameBefore = FileUtils.makePartName(partitionColNames, partitionBefore.getValues());
partitionNameAfter = FileUtils.makePartName(partitionColNames, partitionAfter.getValues());
isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNameBefore:[{}],partitionNameAfter:[{}]",
catalogName, dbName, tblName, partitionNameBefore, partitionNameAfter);
if (isRename) {
Env.getCurrentEnv().getCatalogMgr()
.dropExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameBefore));
Env.getCurrentEnv().getCatalogMgr()
.addExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameAfter));
} else {
Env.getCurrentEnv().getCatalogMgr()
.refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
Lists.newArrayList(partitionNameAfter));
}
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
import java.util.List;
/**
* MetastoreEvent for ALTER_TABLE event type
*/
public class AlterTableEvent extends MetastoreTableEvent {
// the table object before alter operation
private final Table tableBefore;
// the table object after alter operation
private final Table tableAfter;
// true if this alter event was due to a rename operation
private final boolean isRename;
private AlterTableEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType()));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
JSONAlterTableMessage alterTableMessage =
(JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getAlterTableMessage(event.getMessage());
tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter table message"), e);
}
// this is a rename event if either dbName or tblName of before and after object changed
isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
|| !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
}
public static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterTableEvent(event, catalogName));
}
private void processRename() throws DdlException {
if (!isRename) {
return;
}
boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
.externalTableExistInLocal(tableAfter.getDbName(), tableAfter.getTableName(), catalogName);
if (hasExist) {
infoLog("AlterExternalTable canceled,because tableAfter has exist, "
+ "catalogName:[{}],dbName:[{}],tableName:[{}]",
catalogName, dbName, tableAfter.getTableName());
return;
}
Env.getCurrentEnv().getCatalogMgr()
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
Env.getCurrentEnv().getCatalogMgr()
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName);
}
/**
* If the ALTER_TABLE event is due a table rename, this method removes the old table
* and creates a new table with the new name. Else, we just refresh table
*/
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableBefore:[{}],tableAfter:[{}]", catalogName, dbName,
tableBefore.getTableName(), tableAfter.getTableName());
if (isRename) {
processRename();
return;
}
//The scope of refresh can be narrowed in the future
Env.getCurrentEnv().getCatalogMgr()
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
* MetastoreEvent for CREATE_DATABASE event type
*/
public class CreateDatabaseEvent extends MetastoreEvent {
private CreateDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.CREATE_DATABASE));
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new CreateDatabaseEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
Env.getCurrentEnv().getCatalogMgr()
.createExternalDatabase(dbName, catalogName);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import java.util.List;
/**
* MetastoreEvent for CREATE_TABLE event type
*/
public class CreateTableEvent extends MetastoreTableEvent {
private final Table hmsTbl;
private CreateTableEvent(NotificationEvent event, String catalogName) throws MetastoreNotificationException {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
CreateTableMessage createTableMessage =
MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage());
hmsTbl = Preconditions.checkNotNull(createTableMessage.getTableObj());
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to deserialize the event message"), e);
}
}
public static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) {
return Lists.newArrayList(new CreateTableEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName);
boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
.externalTableExistInLocal(dbName, hmsTbl.getTableName(), catalogName);
if (hasExist) {
infoLog(
"CreateExternalTable canceled,because table has exist,"
+ "catalogName:[{}],dbName:[{}],tableName:[{}]",
catalogName, dbName, tblName);
return;
}
Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, hmsTbl.getTableName(), catalogName);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
* MetastoreEvent for DROP_DATABASE event type
*/
public class DropDatabaseEvent extends MetastoreEvent {
private DropDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_DATABASE));
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new DropDatabaseEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
Env.getCurrentEnv().getCatalogMgr()
.dropExternalDatabase(dbName, catalogName);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive.event;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* MetastoreEvent for ADD_PARTITION event type
*/
public class DropPartitionEvent extends MetastoreTableEvent {
private final Table hmsTbl;
private final List<String> partitionNames;
private DropPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_PARTITION));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
DropPartitionMessage dropPartitionMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getDropPartitionMessage(event.getMessage());
hmsTbl = Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
List<Map<String, String>> droppedPartitions = dropPartitionMessage.getPartitions();
partitionNames = new ArrayList<>();
List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
.map(FieldSchema::getName).collect(Collectors.toList());
droppedPartitions.forEach(partition -> partitionNames.add(
getPartitionName(partition, partitionColNames)));
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(
new DropPartitionEvent(event, catalogName));
}
@Override
protected void process() throws MetastoreNotificationException {
try {
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", catalogName, dbName, tblName,
partitionNames.toString());
// bail out early if there are not partitions to process
if (partitionNames.isEmpty()) {
infoLog("Partition list is empty. Ignoring this event.");
return;
}
Env.getCurrentEnv().getCatalogMgr()
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames);
} catch (DdlException e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -25,8 +25,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
@ -34,25 +32,21 @@ import java.util.List;
* MetastoreEvent for DROP_TABLE event type
*/
public class DropTableEvent extends MetastoreTableEvent {
private static final Logger LOG = LogManager.getLogger(DropTableEvent.class);
private final String dbName;
private final String tableName;
private DropTableEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
JSONDropTableMessage dropTableMessage =
(JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getDropTableMessage(event.getMessage());
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
dbName = dropTableMessage.getDB();
JSONDropTableMessage dropTableMessage =
(JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
.getDropTableMessage(event.getMessage());
tableName = dropTableMessage.getTable();
} catch (Exception e) {
throw new MetastoreNotificationException(debugString(
"Could not parse event message. "
+ "Check if %s is set to true in metastore configuration",
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
throw new MetastoreNotificationException(e);
}
}
@ -61,29 +55,14 @@ public class DropTableEvent extends MetastoreTableEvent {
return Lists.newArrayList(new DropTableEvent(event, catalogName));
}
@Override
protected boolean existInCache() {
return true;
}
@Override
protected boolean canBeSkipped() {
return false;
}
protected boolean isSupported() {
return true;
}
@Override
protected void process() throws MetastoreNotificationException {
try {
LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
tableName);
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tableName);
Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName);
} catch (DdlException e) {
LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
catalogName, e);
throw new MetastoreNotificationException(
debugString("Failed to process event"));
}
}
}

View File

@ -27,17 +27,17 @@ import java.util.List;
* An event type which is ignored. Useful for unsupported metastore event types
*/
public class IgnoredEvent extends MetastoreEvent {
protected IgnoredEvent(NotificationEvent event, String catalogName) {
private IgnoredEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
}
private static List<MetastoreEvent> getEvents(NotificationEvent event,
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new IgnoredEvent(event, catalogName));
}
@Override
public void process() {
debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType());
infoLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType());
}
}

View File

@ -22,6 +22,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to
* process a NotificationEvent received from metastore.
@ -105,11 +108,6 @@ public abstract class MetastoreEvent {
return null;
}
protected boolean existInCache() throws MetastoreNotificationException {
return false;
}
/**
* Returns the number of events represented by this event. For most events this is 1.
* In case of batch events this could be more than 1.
@ -128,14 +126,6 @@ public abstract class MetastoreEvent {
return false;
}
/**
* Whether the current version of FE supports processing of some events, some events are reserved,
* and may be processed later version.
*/
protected boolean isSupported() {
return false;
}
/**
* Process the information available in the NotificationEvent.
*/
@ -196,6 +186,26 @@ public abstract class MetastoreEvent {
LOG.debug(formatString, formatArgs);
}
protected String getPartitionName(Map<String, String> part, List<String> partitionColNames) {
if (part.size() == 0) {
return "";
}
if (partitionColNames.size() != part.size()) {
return "";
}
StringBuilder name = new StringBuilder();
int i = 0;
for (String colName : partitionColNames) {
if (i++ > 0) {
name.append("/");
}
name.append(colName);
name.append("=");
name.append(part.get(colName));
}
return name.toString();
}
@Override
public String toString() {
return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);

View File

@ -26,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Factory class to create various MetastoreEvents.
@ -42,31 +40,36 @@ public class MetastoreEventFactory implements EventFactory {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
switch (metastoreEventType) {
case CREATE_TABLE:
return CreateTableEvent.getEvents(event, catalogName);
case DROP_TABLE:
return DropTableEvent.getEvents(event, catalogName);
case ALTER_TABLE:
return AlterTableEvent.getEvents(event, catalogName);
case CREATE_DATABASE:
return CreateDatabaseEvent.getEvents(event, catalogName);
case DROP_DATABASE:
return DropDatabaseEvent.getEvents(event, catalogName);
case ALTER_DATABASE:
return AlterDatabaseEvent.getEvents(event, catalogName);
case ADD_PARTITION:
return AddPartitionEvent.getEvents(event, catalogName);
case DROP_PARTITION:
return DropPartitionEvent.getEvents(event, catalogName);
case ALTER_PARTITION:
return AlterPartitionEvent.getEvents(event, catalogName);
default:
// ignore all the unknown events by creating a IgnoredEvent
return Lists.newArrayList(new IgnoredEvent(event, catalogName));
return IgnoredEvent.getEvents(event, catalogName);
}
}
List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
for (NotificationEvent event : events) {
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName()));
}
List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
.filter(MetastoreEvent::isSupported)
.collect(Collectors.toList());
if (tobeProcessEvents.isEmpty()) {
LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName());
return Collections.emptyList();
}
return createBatchEvents(tobeProcessEvents);
return createBatchEvents(metastoreEvents);
}
/**

View File

@ -713,6 +713,12 @@ public class JournalEntity implements Writable {
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
case OperationType.OP_DROP_EXTERNAL_TABLE:
case OperationType.OP_CREATE_EXTERNAL_TABLE:
case OperationType.OP_DROP_EXTERNAL_DB:
case OperationType.OP_CREATE_EXTERNAL_DB:
case OperationType.OP_ADD_EXTERNAL_PARTITIONS:
case OperationType.OP_DROP_EXTERNAL_PARTITIONS:
case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS:
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
data = ExternalObjectLog.read(in);
isRead = true;

View File

@ -959,6 +959,36 @@ public class EditLog {
env.getCatalogMgr().replayDropExternalTable(log);
break;
}
case OperationType.OP_CREATE_EXTERNAL_TABLE: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayCreateExternalTable(log);
break;
}
case OperationType.OP_DROP_EXTERNAL_DB: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayDropExternalDatabase(log);
break;
}
case OperationType.OP_CREATE_EXTERNAL_DB: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayCreateExternalDatabase(log);
break;
}
case OperationType.OP_ADD_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayAddExternalPartitions(log);
break;
}
case OperationType.OP_DROP_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayDropExternalPartitions(log);
break;
}
case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
env.getCatalogMgr().replayRefreshExternalPartitions(log);
break;
}
case OperationType.OP_INIT_EXTERNAL_TABLE: {
// Do nothing.
break;
@ -1650,6 +1680,30 @@ public class EditLog {
logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
}
public void logCreateExternalTable(ExternalObjectLog log) {
logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log);
}
public void logDropExternalDatabase(ExternalObjectLog log) {
logEdit(OperationType.OP_DROP_EXTERNAL_DB, log);
}
public void logCreateExternalDatabase(ExternalObjectLog log) {
logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log);
}
public void logAddExternalPartitions(ExternalObjectLog log) {
logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log);
}
public void logDropExternalPartitions(ExternalObjectLog log) {
logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log);
}
public void logInvalidateExternalPartitions(ExternalObjectLog log) {
logEdit(OperationType.OP_REFRESH_EXTERNAL_PARTITIONS, log);
}
public Journal getJournal() {
return this.journal;
}

View File

@ -258,6 +258,12 @@ public class OperationType {
public static final short OP_ALTER_MTMV_TASK = 342;
public static final short OP_DROP_EXTERNAL_TABLE = 350;
public static final short OP_DROP_EXTERNAL_DB = 351;
public static final short OP_CREATE_EXTERNAL_TABLE = 352;
public static final short OP_CREATE_EXTERNAL_DB = 353;
public static final short OP_ADD_EXTERNAL_PARTITIONS = 354;
public static final short OP_DROP_EXTERNAL_PARTITIONS = 355;
public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
public static final short OP_ALTER_USER = 400;

View File

@ -18,6 +18,7 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.AnalysisException;
@ -33,8 +34,10 @@ import com.google.common.collect.TreeRangeMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -59,7 +62,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
super(idToPartitionItem, partitionColumns, columnNameToRange);
this.uidToPartitionRange = Maps.newHashMap();
if (partitionColumns.size() > 1) {
this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem);
this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem, new HashMap<>());
this.rangeToId = genRangeToId(uidToPartitionRange);
}
}
@ -77,16 +80,21 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
}
public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange(
Map<Long, PartitionItem> idToPartitionItem) {
Map<Long, PartitionItem> idToPartitionItem, Map<Long, List<UniqueId>> idToUniqueIdsMap) {
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = Maps.newHashMap();
idToPartitionItem.forEach((id, item) -> {
List<PartitionKey> keys = item.getItems();
List<Range<PartitionKey>> ranges = keys.stream()
.map(key -> Range.closed(key, key))
.collect(Collectors.toList());
List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new ArrayList<>(ranges.size())
: idToUniqueIdsMap.get(id);
for (int i = 0; i < ranges.size(); i++) {
uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i));
ListPartitionUniqueId listPartitionUniqueId = new ListPartitionUniqueId(id, i);
uidToPartitionRange.put(listPartitionUniqueId, ranges.get(i));
uniqueIds.add(listPartitionUniqueId);
}
idToUniqueIdsMap.put(id, uniqueIds);
});
return uidToPartitionRange;
}
@ -94,21 +102,35 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
@Override
void genSingleColumnRangeMap() {
if (singleColumnRangeMap == null) {
singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem);
singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem, new HashMap<>());
}
}
public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) {
public static Map<Long, List<String>> getPartitionValuesMap(Map<Long, PartitionItem> idToPartitionItem) {
Map<Long, List<String>> partitionValuesMap = new HashMap<>();
for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
partitionValuesMap.put(entry.getKey(),
((ListPartitionItem) entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
}
return partitionValuesMap;
}
public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem,
Map<Long, List<UniqueId>> idToUniqueIdsMap) {
RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
idToPartitionItem.forEach((id, item) -> {
List<PartitionKey> keys = item.getItems();
List<Range<PartitionKey>> ranges = keys.stream()
.map(key -> Range.closed(key, key))
.collect(Collectors.toList());
List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new ArrayList<>(ranges.size())
: idToUniqueIdsMap.get(id);
for (int i = 0; i < ranges.size(); i++) {
candidate.put(mapPartitionKeyRange(ranges.get(i), 0),
new ListPartitionUniqueId(id, i));
ListPartitionUniqueId listPartitionUniqueId = new ListPartitionUniqueId(id, i);
candidate.put(mapPartitionKeyRange(ranges.get(i), 0), listPartitionUniqueId);
uniqueIds.add(listPartitionUniqueId);
}
idToUniqueIdsMap.put(id, uniqueIds);
});
return candidate;
}
@ -151,6 +173,13 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
return rangeToId;
}
public static Map<UniqueId, Range<ColumnBound>> genSingleUidToColumnRange(
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
Map<UniqueId, Range<ColumnBound>> uidToColumnRange = Maps.newHashMap();
singleColumnRangeMap.asMapOfRanges().forEach((columnBound, uid) -> uidToColumnRange.put(uid, columnBound));
return uidToColumnRange;
}
private Collection<Long> doPruneMultiple(Map<Column, FinalFilters> columnToFilters,
Map<Range<PartitionKey>, UniqueId> partitionRangeToUid,
int columnIdx) {
@ -178,20 +207,20 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
grouped.forEach(candidateRangeMap::put);
return finalFilters.filters.stream()
.map(filter -> {
RangeMap<ColumnBound, List<UniqueId>> filtered =
candidateRangeMap.subRangeMap(filter);
// Find PartitionKey ranges according to filtered UniqueIds.
Map<Range<PartitionKey>, UniqueId> filteredPartitionRange =
filtered.asMapOfRanges().values()
.stream()
.flatMap(List::stream)
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
return doPruneMultiple(columnToFilters, filteredPartitionRange,
columnIdx + 1);
})
.flatMap(Collection::stream)
.collect(Collectors.toSet());
.map(filter -> {
RangeMap<ColumnBound, List<UniqueId>> filtered =
candidateRangeMap.subRangeMap(filter);
// Find PartitionKey ranges according to filtered UniqueIds.
Map<Range<PartitionKey>, UniqueId> filteredPartitionRange =
filtered.asMapOfRanges().values()
.stream()
.flatMap(List::stream)
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
return doPruneMultiple(columnToFilters, filteredPartitionRange,
columnIdx + 1);
})
.flatMap(Collection::stream)
.collect(Collectors.toSet());
case NO_FILTERS:
default:
return doPruneMultiple(columnToFilters, partitionRangeToUid, columnIdx + 1);
@ -215,9 +244,9 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("partitionId", partitionId)
.add("partitionKeyIndex", partitionKeyIndex)
.toString();
.add("partitionId", partitionId)
.add("partitionKeyIndex", partitionKeyIndex)
.toString();
}
@Override

View File

@ -32,8 +32,12 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ResourceMgr;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
@ -41,14 +45,23 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -58,6 +71,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -69,13 +83,14 @@ public class CatalogMgrTest extends TestWithFeService {
private static UserIdentity user2;
private CatalogMgr mgr;
private ResourceMgr resourceMgr;
private ExternalMetaCacheMgr externalMetaCacheMgr;
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
mgr = Env.getCurrentEnv().getCatalogMgr();
resourceMgr = Env.getCurrentEnv().getResourceMgr();
externalMetaCacheMgr = Env.getCurrentEnv().getExtMetaCacheMgr();
ConnectContext rootCtx = createDefaultCtx();
env = Env.getCurrentEnv();
auth = env.getAuth();
@ -417,4 +432,119 @@ public class CatalogMgrTest extends TestWithFeService {
}
}
@Test
public void testAddMultiColumnPartitionsCache() {
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive");
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable",
Lists.newArrayList(Type.INT, Type.SMALLINT));
HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey,
Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache);
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=3", "y=2020/m=4"),
partitionValueCacheKey.getTypes());
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 4);
}
@Test
public void testDropMultiColumnPartitionsCache() {
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive");
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable",
Lists.newArrayList(Type.INT, Type.SMALLINT));
HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey,
Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache);
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
partitionValueCacheKey.getTypes(), false);
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0);
}
@Test
public void testAddSingleColumnPartitionsCache() {
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive");
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable",
Lists.newArrayList(Type.SMALLINT));
HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey,
Lists.newArrayList("m=1", "m=2"), metaStoreCache);
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=3", "m=4"),
partitionValueCacheKey.getTypes());
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 4);
}
@Test
public void testDropSingleColumnPartitionsCache() {
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive");
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable",
Lists.newArrayList(Type.SMALLINT));
HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey,
Lists.newArrayList("m=1", "m=2"), metaStoreCache);
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=1", "m=2"),
partitionValueCacheKey.getTypes(), false);
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0);
}
@Test
public void testAddPartitionsCacheToLargeTable() {
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive");
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable",
Lists.newArrayList(Type.INT));
List<String> pNames = new ArrayList<>(100000);
for (int i = 1; i <= 100000; i++) {
pNames.add("m=" + i);
}
HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey,
pNames, metaStoreCache);
metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues);
long start = System.currentTimeMillis();
metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=100001"),
partitionValueCacheKey.getTypes());
//387 in 4c16g
System.out.println("testAddPartitionsCacheToLargeTable use time mills:" + (System.currentTimeMillis() - start));
HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey);
Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 100001);
}
private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key, List<String> partitionNames,
HiveMetaStoreCache metaStoreCache) {
// partition name format: nation=cn/city=beijing
Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size());
Map<String, Long> partitionNameToIdMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
long idx = 0;
for (String partitionName : partitionNames) {
long partitionId = idx++;
ListPartitionItem listPartitionItem = metaStoreCache.toListPartitionItem(partitionName, key.getTypes());
idToPartitionItem.put(partitionId, listPartitionItem);
partitionNameToIdMap.put(partitionName, partitionId);
}
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
Map<Range<PartitionKey>, UniqueId> rangeToId = null;
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
if (key.getTypes().size() > 1) {
// uidToPartitionRange and rangeToId are only used for multi-column partition
uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap);
rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
} else {
Preconditions.checkState(key.getTypes().size() == 1, key.getTypes());
// singleColumnRangeMap is only used for single-column partition
singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap);
singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
}
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap, idx,
partitionNameToIdMap, idToUniqueIdsMap, singleUidToColumnRangeMap, partitionValuesMap);
}
}