bp #33610
This commit is contained in:
@ -49,7 +49,6 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -416,6 +415,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean registerTable(TableIf table) {
|
||||
boolean result = true;
|
||||
Table olapTable = (Table) table;
|
||||
@ -851,11 +851,6 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Long, TableIf> getIdToTable() {
|
||||
return new HashMap<>(idToTable);
|
||||
}
|
||||
|
||||
public void replayUpdateDbProperties(Map<String, String> properties) {
|
||||
dbProperties.updateProperties(properties);
|
||||
if (PropertyAnalyzer.hasBinlogConfig(properties)) {
|
||||
|
||||
@ -31,7 +31,6 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
@ -75,7 +74,7 @@ public interface DatabaseIf<T extends TableIf> {
|
||||
|
||||
List<T> getTables();
|
||||
|
||||
default List<T> getTablesOrEmpty() {
|
||||
default List<T> getTablesIgnoreException() {
|
||||
try {
|
||||
return getTables();
|
||||
} catch (Exception e) {
|
||||
@ -281,6 +280,4 @@ public interface DatabaseIf<T extends TableIf> {
|
||||
default long getLastUpdateTime() {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
Map<Long, TableIf> getIdToTable();
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -125,20 +126,8 @@ public class JdbcResource extends Resource {
|
||||
CONNECTION_POOL_MAX_LIFE_TIME,
|
||||
CONNECTION_POOL_MAX_WAIT_TIME,
|
||||
CONNECTION_POOL_KEEP_ALIVE,
|
||||
TEST_CONNECTION
|
||||
).build();
|
||||
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_META_NAMES,
|
||||
META_NAMES_MAPPING,
|
||||
INCLUDE_DATABASE_LIST,
|
||||
EXCLUDE_DATABASE_LIST,
|
||||
CONNECTION_POOL_MIN_SIZE,
|
||||
CONNECTION_POOL_MAX_SIZE,
|
||||
CONNECTION_POOL_MAX_LIFE_TIME,
|
||||
CONNECTION_POOL_MAX_WAIT_TIME,
|
||||
CONNECTION_POOL_KEEP_ALIVE,
|
||||
TEST_CONNECTION
|
||||
TEST_CONNECTION,
|
||||
ExternalCatalog.USE_META_CACHE
|
||||
).build();
|
||||
|
||||
// The default value of optional properties
|
||||
@ -157,6 +146,8 @@ public class JdbcResource extends Resource {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ExternalCatalog.USE_META_CACHE,
|
||||
String.valueOf(ExternalCatalog.DEFAULT_USE_META_CACHE));
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
@ -225,7 +216,7 @@ public class JdbcResource extends Resource {
|
||||
|
||||
@Override
|
||||
public void applyDefaultProperties() {
|
||||
for (String s : OPTIONAL_PROPERTIES) {
|
||||
for (String s : OPTIONAL_PROPERTIES_DEFAULT_VALUE.keySet()) {
|
||||
if (!configs.containsKey(s)) {
|
||||
configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s));
|
||||
}
|
||||
@ -487,3 +478,4 @@ public class JdbcResource extends Resource {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.alter.Alter;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.CreateViewStmt;
|
||||
import org.apache.doris.common.Pair;
|
||||
|
||||
@ -43,17 +41,6 @@ public abstract class MysqlCompatibleDatabase extends Database {
|
||||
*/
|
||||
protected abstract void initTables();
|
||||
|
||||
/**
|
||||
* Currently, rename a table of InfoSchemaDb will throw exception
|
||||
* {@link Alter#processAlterTable(AlterTableStmt)}
|
||||
* so we follow this design.
|
||||
* @note: Rename a table of mysql database in MYSQL ls allowed.
|
||||
*/
|
||||
@Override
|
||||
public boolean registerTable(TableIf table) {
|
||||
return super.registerTable(table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterTable(String name) {
|
||||
// Do nothing
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -108,10 +109,14 @@ public class RefreshManager {
|
||||
|
||||
private void refreshDbInternal(long catalogId, long dbId, boolean invalidCache) {
|
||||
ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
ExternalDatabase db = catalog.getDbForReplay(dbId);
|
||||
db.setUnInitialized(invalidCache);
|
||||
LOG.info("refresh database {} in catalog {} with invalidCache {}", db.getFullName(), catalog.getName(),
|
||||
invalidCache);
|
||||
Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(dbId);
|
||||
// Database may not exist if 'use_meta_cache' is true.
|
||||
// Because each FE fetch the meta data independently.
|
||||
db.ifPresent(e -> {
|
||||
e.setUnInitialized(invalidCache);
|
||||
LOG.info("refresh database {} in catalog {} with invalidCache {}", e.getFullName(),
|
||||
catalog.getName(), invalidCache);
|
||||
});
|
||||
}
|
||||
|
||||
// Refresh table
|
||||
@ -163,13 +168,22 @@ public class RefreshManager {
|
||||
LOG.warn("failed to find catalog replaying refresh table {}", log.getCatalogId());
|
||||
return;
|
||||
}
|
||||
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
|
||||
TableIf table = db.getTableForReplay(log.getTableId());
|
||||
refreshTableInternal(catalog, db, table, log.getLastUpdateTime());
|
||||
Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(log.getDbId());
|
||||
// See comment in refreshDbInternal for why db and table may be null.
|
||||
if (!db.isPresent()) {
|
||||
LOG.warn("failed to find db replaying refresh table {}", log.getDbId());
|
||||
return;
|
||||
}
|
||||
Optional<? extends ExternalTable> table = db.get().getTableForReplay(log.getTableId());
|
||||
if (!table.isPresent()) {
|
||||
LOG.warn("failed to find table replaying refresh table {}", log.getTableId());
|
||||
return;
|
||||
}
|
||||
refreshTableInternal(catalog, db.get(), table.get(), log.getLastUpdateTime());
|
||||
}
|
||||
|
||||
public void refreshExternalTableFromEvent(String catalogName, String dbName, String tableName,
|
||||
long updateTime, boolean ignoreIfNotExists) throws DdlException {
|
||||
long updateTime) throws DdlException {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
@ -179,17 +193,11 @@ public class RefreshManager {
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
refreshTableInternal(catalog, db, table, updateTime);
|
||||
|
||||
@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener;
|
||||
import com.github.benmanes.caffeine.cache.Ticker;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
@ -74,9 +75,13 @@ public class CacheFactory {
|
||||
}
|
||||
|
||||
// Build a loading cache, with executor, it will use given executor for refresh
|
||||
public <K, V> LoadingCache<K, V> buildCache(CacheLoader<K, V> cacheLoader, ExecutorService executor) {
|
||||
public <K, V> LoadingCache<K, V> buildCache(CacheLoader<K, V> cacheLoader,
|
||||
RemovalListener<K, V> removalListener, ExecutorService executor) {
|
||||
Caffeine<Object, Object> builder = buildWithParams();
|
||||
builder.executor(executor);
|
||||
if (removalListener != null) {
|
||||
builder.removalListener(removalListener);
|
||||
}
|
||||
return builder.build(cacheLoader);
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -1348,6 +1349,14 @@ public class PropertyAnalyzer {
|
||||
throw new AnalysisException("failed to find class " + acClass, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (isAlter) {
|
||||
// The 'use_meta_cache' property can not be modified
|
||||
if (properties.containsKey(ExternalCatalog.USE_META_CACHE)) {
|
||||
throw new AnalysisException("Can not modify property " + ExternalCatalog.USE_META_CACHE
|
||||
+ ". You need to create a new Catalog with the property.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, String> rewriteReplicaAllocationProperties(
|
||||
|
||||
@ -44,6 +44,9 @@ import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@ -651,4 +654,21 @@ public class Util {
|
||||
p.printStackTrace(pw);
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
public static long sha256long(String str) {
|
||||
try {
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
byte[] hash = digest.digest(str.getBytes());
|
||||
ByteBuffer buffer = ByteBuffer.wrap(hash);
|
||||
return buffer.getLong();
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
return str.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
// Only used for external table's id generation
|
||||
// And the table's id must >=0, see DescriptorTable.toThrift()
|
||||
public static long genTableIdByName(String tblName) {
|
||||
return Math.abs(sha256long(tblName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,10 +142,13 @@ public class CatalogFactory {
|
||||
default:
|
||||
throw new DdlException("Unknown catalog type: " + catalogType);
|
||||
}
|
||||
|
||||
// set some default properties if missing when creating catalog.
|
||||
// both replaying the creating logic will call this method.
|
||||
catalog.setDefaultPropsIfMissing(isReplay);
|
||||
|
||||
if (!isReplay) {
|
||||
// set some default properties when creating catalog.
|
||||
// do not call this method when replaying edit log. Because we need to keey the original properties.
|
||||
catalog.setDefaultPropsWhenCreating(isReplay);
|
||||
catalog.checkWhenCreating();
|
||||
// This will check if the customized access controller can be created successfully.
|
||||
// If failed, it will throw exception and the catalog will not be created.
|
||||
try {
|
||||
|
||||
@ -40,7 +40,6 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@ -184,8 +183,6 @@ public interface CatalogIf<T extends DatabaseIf> {
|
||||
|
||||
boolean enableAutoAnalyze();
|
||||
|
||||
ConcurrentHashMap<Long, DatabaseIf> getIdToDb();
|
||||
|
||||
void createDb(CreateDbStmt stmt) throws DdlException;
|
||||
|
||||
void dropDb(DropDbStmt stmt) throws DdlException;
|
||||
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.hive.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
@ -66,6 +67,7 @@ import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -587,11 +589,11 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
if (catalog == null) {
|
||||
return;
|
||||
}
|
||||
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
|
||||
if (db == null) {
|
||||
Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(log.getDbId());
|
||||
if (!db.isPresent()) {
|
||||
return;
|
||||
}
|
||||
db.replayInitDb(log, catalog);
|
||||
db.get().replayInitDb(log, catalog);
|
||||
}
|
||||
|
||||
public void unregisterExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
|
||||
@ -663,7 +665,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
return;
|
||||
}
|
||||
long tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName);
|
||||
long tblId;
|
||||
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
|
||||
if (hmsCatalog.getUseMetaCache().get()) {
|
||||
tblId = Util.genTableIdByName(tableName);
|
||||
} else {
|
||||
tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName);
|
||||
}
|
||||
// -1L means it will be dropped later, ignore
|
||||
if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
|
||||
return;
|
||||
@ -715,13 +723,19 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
return;
|
||||
}
|
||||
|
||||
long dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName);
|
||||
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
|
||||
long dbId;
|
||||
if (hmsCatalog.getUseMetaCache().get()) {
|
||||
dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName);
|
||||
} else {
|
||||
dbId = Util.genTableIdByName(dbName);
|
||||
}
|
||||
// -1L means it will be dropped later, ignore
|
||||
if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
|
||||
return;
|
||||
}
|
||||
|
||||
((HMSExternalCatalog) catalog).registerDatabase(dbId, dbName);
|
||||
hmsCatalog.registerDatabase(dbId, dbName);
|
||||
}
|
||||
|
||||
public void addExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.catalog.MysqlDb;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.Version;
|
||||
@ -44,6 +45,7 @@ import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
|
||||
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
|
||||
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
|
||||
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
|
||||
import org.apache.doris.datasource.metacache.MetaCache;
|
||||
import org.apache.doris.datasource.operations.ExternalMetadataOps;
|
||||
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
@ -54,9 +56,11 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.MasterCatalogExecutor;
|
||||
import org.apache.doris.transaction.TransactionManager;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
@ -65,6 +69,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.DataInput;
|
||||
@ -77,19 +82,24 @@ import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The abstract class for all types of external catalogs.
|
||||
*/
|
||||
@Data
|
||||
public abstract class ExternalCatalog
|
||||
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
|
||||
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
|
||||
|
||||
public static final String ENABLE_AUTO_ANALYZE = "enable.auto.analyze";
|
||||
public static final String DORIS_VERSION = "doris.version";
|
||||
public static final String DORIS_VERSION_VALUE = Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH;
|
||||
public static final String USE_META_CACHE = "use_meta_cache";
|
||||
// Set default value to false to be compatible with older version meta data.
|
||||
public static final boolean DEFAULT_USE_META_CACHE = false;
|
||||
|
||||
// Unique id of this catalog, will be assigned after catalog is loaded.
|
||||
@SerializedName(value = "id")
|
||||
@ -123,6 +133,9 @@ public abstract class ExternalCatalog
|
||||
private byte[] propLock = new byte[0];
|
||||
private Map<String, String> convertedProperties = null;
|
||||
|
||||
protected Optional<Boolean> useMetaCache = Optional.empty();
|
||||
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
|
||||
|
||||
public ExternalCatalog() {
|
||||
}
|
||||
|
||||
@ -130,7 +143,7 @@ public abstract class ExternalCatalog
|
||||
this.id = catalogId;
|
||||
this.name = name;
|
||||
this.logType = logType;
|
||||
this.comment = com.google.common.base.Strings.nullToEmpty(comment);
|
||||
this.comment = Strings.nullToEmpty(comment);
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
@ -142,11 +155,6 @@ public abstract class ExternalCatalog
|
||||
return conf;
|
||||
}
|
||||
|
||||
// only for test
|
||||
public void setInitialized() {
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* set some default properties when creating catalog
|
||||
* @return list of database names in this catalog
|
||||
@ -160,8 +168,21 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
}
|
||||
|
||||
public void setDefaultPropsWhenCreating(boolean isReplay) throws DdlException {
|
||||
// Will be called when creating catalog(so when as replaying)
|
||||
// to add some default properties if missing.
|
||||
public void setDefaultPropsIfMissing(boolean isReplay) {
|
||||
if (catalogProperty.getOrDefault(USE_META_CACHE, "").isEmpty()) {
|
||||
// If not setting USE_META_CACHE in replay logic,
|
||||
// set default value to false to be compatible with older version meta data.
|
||||
catalogProperty.addProperty(USE_META_CACHE, isReplay ? "false" : String.valueOf(DEFAULT_USE_META_CACHE));
|
||||
}
|
||||
useMetaCache = Optional.of(
|
||||
Boolean.valueOf(catalogProperty.getOrDefault(USE_META_CACHE, String.valueOf(DEFAULT_USE_META_CACHE))));
|
||||
}
|
||||
|
||||
// Will be called when creating catalog(not replaying).
|
||||
// Subclass can override this method to do some check when creating catalog.
|
||||
public void checkWhenCreating() throws DdlException {
|
||||
}
|
||||
|
||||
/**
|
||||
@ -204,19 +225,36 @@ public abstract class ExternalCatalog
|
||||
public final synchronized void makeSureInitialized() {
|
||||
initLocalObjects();
|
||||
if (!initialized) {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
// Forward to master and wait the journal to replay.
|
||||
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
|
||||
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
|
||||
try {
|
||||
remoteExecutor.forward(id, -1);
|
||||
} catch (Exception e) {
|
||||
Util.logAndThrowRuntimeException(LOG,
|
||||
String.format("failed to forward init catalog %s operation to master.", name), e);
|
||||
if (useMetaCache.get()) {
|
||||
if (metaCache != null) {
|
||||
metaCache.invalidateAll();
|
||||
} else {
|
||||
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
|
||||
name,
|
||||
OptionalLong.of(86400L),
|
||||
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
|
||||
Config.max_hive_table_cache_num,
|
||||
ignored -> getFilteredDatabaseNames(),
|
||||
dbName -> Optional.ofNullable(
|
||||
buildDbForInit(dbName, Util.genTableIdByName(dbName), logType)),
|
||||
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
|
||||
}
|
||||
return;
|
||||
setLastUpdateTime(System.currentTimeMillis());
|
||||
} else {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
// Forward to master and wait the journal to replay.
|
||||
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
|
||||
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
|
||||
try {
|
||||
remoteExecutor.forward(id, -1);
|
||||
} catch (Exception e) {
|
||||
Util.logAndThrowRuntimeException(LOG,
|
||||
String.format("failed to forward init catalog %s operation to master.", name), e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
init();
|
||||
}
|
||||
init();
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
@ -283,18 +321,13 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
|
||||
// init schema related objects
|
||||
protected void init() {
|
||||
private void init() {
|
||||
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
|
||||
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
|
||||
InitCatalogLog initCatalogLog = new InitCatalogLog();
|
||||
initCatalogLog.setCatalogId(id);
|
||||
initCatalogLog.setType(logType);
|
||||
List<String> allDatabases = Lists.newArrayList(listDatabaseNames());
|
||||
|
||||
allDatabases.remove(InfoSchemaDb.DATABASE_NAME);
|
||||
allDatabases.add(InfoSchemaDb.DATABASE_NAME);
|
||||
allDatabases.remove(MysqlDb.DATABASE_NAME);
|
||||
allDatabases.add(MysqlDb.DATABASE_NAME);
|
||||
List<String> allDatabases = getFilteredDatabaseNames();
|
||||
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
|
||||
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
|
||||
for (String dbName : allDatabases) {
|
||||
@ -318,7 +351,7 @@ public abstract class ExternalCatalog
|
||||
} else {
|
||||
dbId = Env.getCurrentEnv().getNextId();
|
||||
tmpDbNameToId.put(dbName, dbId);
|
||||
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
|
||||
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType);
|
||||
tmpIdToDb.put(dbId, db);
|
||||
initCatalogLog.addCreateDb(dbId, dbName);
|
||||
}
|
||||
@ -331,6 +364,16 @@ public abstract class ExternalCatalog
|
||||
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private List<String> getFilteredDatabaseNames() {
|
||||
List<String> allDatabases = Lists.newArrayList(listDatabaseNames());
|
||||
allDatabases.remove(InfoSchemaDb.DATABASE_NAME);
|
||||
allDatabases.add(InfoSchemaDb.DATABASE_NAME);
|
||||
allDatabases.remove(MysqlDb.DATABASE_NAME);
|
||||
allDatabases.add(MysqlDb.DATABASE_NAME);
|
||||
return allDatabases;
|
||||
}
|
||||
|
||||
public void onRefresh(boolean invalidCache) {
|
||||
this.objectCreated = false;
|
||||
this.initialized = false;
|
||||
@ -382,12 +425,18 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
|
||||
/**
|
||||
* Different from 'listDatabases()', this method will return dbnames from cache.
|
||||
* while 'listDatabases()' will return dbnames from remote datasource.
|
||||
* @return names of database in this catalog.
|
||||
*/
|
||||
@Override
|
||||
public List<String> getDbNames() {
|
||||
makeSureInitialized();
|
||||
return new ArrayList<>(dbNameToId.keySet());
|
||||
if (useMetaCache.get()) {
|
||||
return metaCache.listNames();
|
||||
} else {
|
||||
return new ArrayList<>(dbNameToId.keySet());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -404,14 +453,10 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableNameByTableId(Long tableId) {
|
||||
for (DatabaseIf<?> db : idToDb.values()) {
|
||||
TableIf table = db.getTableNullable(tableId);
|
||||
if (table != null) {
|
||||
return new TableName(getName(), db.getFullName(), table.getName());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
throw new UnsupportedOperationException("External catalog does not support getTableNameByTableId() method."
|
||||
+ ", table id: " + tableId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -432,21 +477,23 @@ public abstract class ExternalCatalog
|
||||
return null;
|
||||
}
|
||||
String realDbName = ClusterNamespace.getNameFromFullName(dbName);
|
||||
if (dbNameToId.containsKey(realDbName)) {
|
||||
return idToDb.get(dbNameToId.get(realDbName));
|
||||
} else {
|
||||
// This maybe a information_schema db request, and information_schema db name is case insensitive.
|
||||
// So, we first extract db name to check if it is information_schema.
|
||||
// Then we reassemble the origin cluster name with lower case db name,
|
||||
// and finally get information_schema db from the name map.
|
||||
if (realDbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
|
||||
return idToDb.get(dbNameToId.get(InfoSchemaDb.DATABASE_NAME));
|
||||
}
|
||||
if (realDbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME)) {
|
||||
return idToDb.get(dbNameToId.get(MysqlDb.DATABASE_NAME));
|
||||
}
|
||||
|
||||
// information_schema db name is case-insensitive.
|
||||
// So, we first convert it to standard database name.
|
||||
if (realDbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
|
||||
realDbName = InfoSchemaDb.DATABASE_NAME;
|
||||
} else if (realDbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME)) {
|
||||
realDbName = MysqlDb.DATABASE_NAME;
|
||||
}
|
||||
|
||||
if (useMetaCache.get()) {
|
||||
return metaCache.getMetaObj(realDbName).orElse(null);
|
||||
} else {
|
||||
if (dbNameToId.containsKey(realDbName)) {
|
||||
return idToDb.get(dbNameToId.get(realDbName));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@ -458,13 +505,22 @@ public abstract class ExternalCatalog
|
||||
LOG.warn("failed to get db {} in catalog {}", dbId, name, e);
|
||||
return null;
|
||||
}
|
||||
return idToDb.get(dbId);
|
||||
|
||||
if (useMetaCache.get()) {
|
||||
return metaCache.getMetaObjById(dbId).get();
|
||||
} else {
|
||||
return idToDb.get(dbId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Long> getDbIds() {
|
||||
makeSureInitialized();
|
||||
return Lists.newArrayList(dbNameToId.values());
|
||||
if (useMetaCache.get()) {
|
||||
return getAllDbs().stream().map(DatabaseIf::getId).collect(Collectors.toList());
|
||||
} else {
|
||||
return Lists.newArrayList(dbNameToId.values());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -528,14 +584,18 @@ public abstract class ExternalCatalog
|
||||
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
|
||||
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
|
||||
for (int i = 0; i < log.getRefreshCount(); i++) {
|
||||
ExternalDatabase<? extends ExternalTable> db = getDbForReplay(log.getRefreshDbIds().get(i));
|
||||
db.setUnInitialized(invalidCacheInInit);
|
||||
tmpDbNameToId.put(db.getFullName(), db.getId());
|
||||
tmpIdToDb.put(db.getId(), db);
|
||||
Optional<ExternalDatabase<? extends ExternalTable>> db = getDbForReplay(log.getRefreshDbIds().get(i));
|
||||
// Should not return null.
|
||||
// Because replyInitCatalog can only be called when `use_meta_cache` is false.
|
||||
// And if `use_meta_cache` is false, getDbForReplay() will not return null
|
||||
Preconditions.checkNotNull(db.get());
|
||||
db.get().setUnInitialized(invalidCacheInInit);
|
||||
tmpDbNameToId.put(db.get().getFullName(), db.get().getId());
|
||||
tmpIdToDb.put(db.get().getId(), db.get());
|
||||
}
|
||||
for (int i = 0; i < log.getCreateCount(); i++) {
|
||||
ExternalDatabase<? extends ExternalTable> db =
|
||||
getDbForInit(log.getCreateDbNames().get(i), log.getCreateDbIds().get(i), log.getType());
|
||||
buildDbForInit(log.getCreateDbNames().get(i), log.getCreateDbIds().get(i), log.getType());
|
||||
if (db != null) {
|
||||
tmpDbNameToId.put(db.getFullName(), db.getId());
|
||||
tmpIdToDb.put(db.getId(), db);
|
||||
@ -547,12 +607,19 @@ public abstract class ExternalCatalog
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
public ExternalDatabase<? extends ExternalTable> getDbForReplay(long dbId) {
|
||||
return idToDb.get(dbId);
|
||||
public Optional<ExternalDatabase<? extends ExternalTable>> getDbForReplay(long dbId) {
|
||||
if (useMetaCache.get()) {
|
||||
if (!isInitialized()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return metaCache.getMetaObjById(dbId);
|
||||
} else {
|
||||
return Optional.ofNullable(idToDb.get(dbId));
|
||||
}
|
||||
}
|
||||
|
||||
protected ExternalDatabase<? extends ExternalTable> getDbForInit(String dbName, long dbId,
|
||||
InitCatalogLog.Type logType) {
|
||||
protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String dbName, long dbId,
|
||||
InitCatalogLog.Type logType) {
|
||||
if (dbName.equals(InfoSchemaDb.DATABASE_NAME)) {
|
||||
return new ExternalInfoSchemaDatabase(this, dbId);
|
||||
}
|
||||
@ -614,6 +681,8 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
}
|
||||
this.propLock = new byte[0];
|
||||
this.initialized = false;
|
||||
setDefaultPropsIfMissing(true);
|
||||
}
|
||||
|
||||
public void addDatabaseForTest(ExternalDatabase<? extends ExternalTable> db) {
|
||||
@ -722,10 +791,24 @@ public abstract class ExternalCatalog
|
||||
return null;
|
||||
}
|
||||
|
||||
// ATTN: this method only return all cached databases.
|
||||
// will not visit remote datasource's metadata
|
||||
@Override
|
||||
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
|
||||
makeSureInitialized();
|
||||
return new HashSet<>(idToDb.values());
|
||||
if (useMetaCache.get()) {
|
||||
Set<DatabaseIf<? extends TableIf>> dbs = Sets.newHashSet();
|
||||
List<String> dbNames = getDbNames();
|
||||
for (String dbName : dbNames) {
|
||||
ExternalDatabase<? extends ExternalTable> db = getDbNullable(dbName);
|
||||
if (db != null) {
|
||||
dbs.add(db);
|
||||
}
|
||||
}
|
||||
return dbs;
|
||||
} else {
|
||||
return new HashSet<>(idToDb.values());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -740,9 +823,4 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentHashMap<Long, DatabaseIf> getIdToDb() {
|
||||
return new ConcurrentHashMap<>(idToDb);
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.InfoSchemaDb;
|
||||
import org.apache.doris.catalog.MysqlDb;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
@ -32,6 +33,7 @@ import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
|
||||
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable;
|
||||
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
|
||||
import org.apache.doris.datasource.infoschema.ExternalMysqlTable;
|
||||
import org.apache.doris.datasource.metacache.MetaCache;
|
||||
import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -50,9 +52,10 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -86,6 +89,8 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
protected ExternalCatalog extCatalog;
|
||||
protected boolean invalidCacheInInit = true;
|
||||
|
||||
private MetaCache<T> metaCache;
|
||||
|
||||
/**
|
||||
* Create external database.
|
||||
*
|
||||
@ -125,19 +130,37 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
public final synchronized void makeSureInitialized() {
|
||||
extCatalog.makeSureInitialized();
|
||||
if (!initialized) {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
// Forward to master and wait the journal to replay.
|
||||
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
|
||||
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
|
||||
try {
|
||||
remoteExecutor.forward(extCatalog.getId(), id);
|
||||
} catch (Exception e) {
|
||||
Util.logAndThrowRuntimeException(LOG,
|
||||
String.format("failed to forward init external db %s operation to master", name), e);
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (metaCache != null) {
|
||||
metaCache.invalidateAll();
|
||||
} else {
|
||||
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
|
||||
name,
|
||||
OptionalLong.of(86400L),
|
||||
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
|
||||
Config.max_hive_table_cache_num,
|
||||
ignored -> listTableNames(),
|
||||
tableName -> Optional.ofNullable(
|
||||
buildTableForInit(tableName, Util.genTableIdByName(tableName), extCatalog)),
|
||||
(key, value, cause) -> value.ifPresent(ExternalTable::unsetObjectCreated));
|
||||
}
|
||||
return;
|
||||
setLastUpdateTime(System.currentTimeMillis());
|
||||
} else {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
// Forward to master and wait the journal to replay.
|
||||
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
|
||||
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
|
||||
try {
|
||||
remoteExecutor.forward(extCatalog.getId(), id);
|
||||
} catch (Exception e) {
|
||||
Util.logAndThrowRuntimeException(LOG,
|
||||
String.format("failed to forward init external db %s operation to master", name), e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
init();
|
||||
}
|
||||
init();
|
||||
initialized = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,20 +168,20 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
|
||||
Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap();
|
||||
for (int i = 0; i < log.getRefreshCount(); i++) {
|
||||
T table = getTableForReplay(log.getRefreshTableIds().get(i));
|
||||
Optional<T> table = getTableForReplay(log.getRefreshTableIds().get(i));
|
||||
// When upgrade cluster with this pr: https://github.com/apache/doris/pull/27666
|
||||
// Maybe there are some create table events will be skipped
|
||||
// if the cluster has any hms catalog(s) with hms event listener enabled.
|
||||
// So we need add a validation here to avoid table(s) not found, this is just a temporary solution
|
||||
// because later we will remove all the logics about InitCatalogLog/InitDatabaseLog.
|
||||
if (table != null) {
|
||||
table.unsetObjectCreated();
|
||||
tmpTableNameToId.put(table.getName(), table.getId());
|
||||
tmpIdToTbl.put(table.getId(), table);
|
||||
if (table.isPresent()) {
|
||||
table.get().unsetObjectCreated();
|
||||
tmpTableNameToId.put(table.get().getName(), table.get().getId());
|
||||
tmpIdToTbl.put(table.get().getId(), table.get());
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < log.getCreateCount(); i++) {
|
||||
T table = newExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog);
|
||||
T table = buildTableForInit(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog);
|
||||
tmpTableNameToId.put(table.getName(), table.getId());
|
||||
tmpIdToTbl.put(table.getId(), table);
|
||||
}
|
||||
@ -168,19 +191,12 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
protected void init() {
|
||||
private void init() {
|
||||
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
|
||||
initDatabaseLog.setType(dbLogType);
|
||||
initDatabaseLog.setCatalogId(extCatalog.getId());
|
||||
initDatabaseLog.setDbId(id);
|
||||
List<String> tableNames;
|
||||
if (name.equals(InfoSchemaDb.DATABASE_NAME)) {
|
||||
tableNames = ExternalInfoSchemaDatabase.listTableNames();
|
||||
} else if (name.equals(MysqlDb.DATABASE_NAME)) {
|
||||
tableNames = ExternalMysqlDatabase.listTableNames();
|
||||
} else {
|
||||
tableNames = extCatalog.listTableNames(null, name);
|
||||
}
|
||||
List<String> tableNames = listTableNames();
|
||||
if (tableNames != null) {
|
||||
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
|
||||
Map<Long, T> tmpIdToTbl = Maps.newHashMap();
|
||||
@ -196,7 +212,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
} else {
|
||||
tblId = Env.getCurrentEnv().getNextId();
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
T table = newExternalTable(tableName, tblId, extCatalog);
|
||||
T table = buildTableForInit(tableName, tblId, extCatalog);
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addCreateTable(tblId, tableName);
|
||||
}
|
||||
@ -207,14 +223,32 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
|
||||
lastUpdateTime = System.currentTimeMillis();
|
||||
initDatabaseLog.setLastUpdateTime(lastUpdateTime);
|
||||
initialized = true;
|
||||
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
|
||||
}
|
||||
|
||||
protected abstract T newExternalTable(String tableName, long tblId, ExternalCatalog catalog);
|
||||
private List<String> listTableNames() {
|
||||
List<String> tableNames;
|
||||
if (name.equals(InfoSchemaDb.DATABASE_NAME)) {
|
||||
tableNames = ExternalInfoSchemaDatabase.listTableNames();
|
||||
} else if (name.equals(MysqlDb.DATABASE_NAME)) {
|
||||
tableNames = ExternalMysqlDatabase.listTableNames();
|
||||
} else {
|
||||
tableNames = extCatalog.listTableNames(null, name);
|
||||
}
|
||||
return tableNames;
|
||||
}
|
||||
|
||||
public T getTableForReplay(long tableId) {
|
||||
return idToTbl.get(tableId);
|
||||
protected abstract T buildTableForInit(String tableName, long tblId, ExternalCatalog catalog);
|
||||
|
||||
public Optional<T> getTableForReplay(long tableId) {
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (!isInitialized()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return metaCache.getMetaObjById(tableId);
|
||||
} else {
|
||||
return Optional.ofNullable(idToTbl.get(tableId));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -288,10 +322,23 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
return extCatalog.tableExist(ConnectContext.get().getSessionContext(), name, tableName);
|
||||
}
|
||||
|
||||
// ATTN: this method only returned cached tables.
|
||||
@Override
|
||||
public List<T> getTables() {
|
||||
makeSureInitialized();
|
||||
return Lists.newArrayList(idToTbl.values());
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
List<T> tables = Lists.newArrayList();
|
||||
Set<String> tblNames = getTableNamesWithLock();
|
||||
for (String tblName : tblNames) {
|
||||
T tbl = getTableNullable(tblName);
|
||||
if (tbl != null) {
|
||||
tables.add(tbl);
|
||||
}
|
||||
}
|
||||
return tables;
|
||||
} else {
|
||||
return Lists.newArrayList(idToTbl.values());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -312,22 +359,34 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
@Override
|
||||
public Set<String> getTableNamesWithLock() {
|
||||
makeSureInitialized();
|
||||
return Sets.newHashSet(tableNameToId.keySet());
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
return Sets.newHashSet(metaCache.listNames());
|
||||
} else {
|
||||
return Sets.newHashSet(tableNameToId.keySet());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getTableNullable(String tableName) {
|
||||
makeSureInitialized();
|
||||
if (!tableNameToId.containsKey(tableName)) {
|
||||
return null;
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
return metaCache.getMetaObj(tableName).get();
|
||||
} else {
|
||||
if (!tableNameToId.containsKey(tableName)) {
|
||||
return null;
|
||||
}
|
||||
return idToTbl.get(tableNameToId.get(tableName));
|
||||
}
|
||||
return idToTbl.get(tableNameToId.get(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getTableNullable(long tableId) {
|
||||
makeSureInitialized();
|
||||
return idToTbl.get(tableId);
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
return metaCache.getMetaObjById(tableId).orElse(null);
|
||||
} else {
|
||||
return idToTbl.get(tableId);
|
||||
}
|
||||
}
|
||||
|
||||
public long getLastUpdateTime() {
|
||||
@ -387,12 +446,19 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("create table [{}]", tableName);
|
||||
}
|
||||
Long tableId = tableNameToId.remove(tableName);
|
||||
if (tableId == null) {
|
||||
LOG.warn("table [{}] does not exist when drop", tableName);
|
||||
return;
|
||||
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.invalidate(tableName);
|
||||
}
|
||||
} else {
|
||||
Long tableId = tableNameToId.remove(tableName);
|
||||
if (tableId == null) {
|
||||
LOG.warn("table [{}] does not exist when drop", tableName);
|
||||
return;
|
||||
}
|
||||
idToTbl.remove(tableId);
|
||||
}
|
||||
idToTbl.remove(tableId);
|
||||
setLastUpdateTime(System.currentTimeMillis());
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(
|
||||
extCatalog.getId(), getFullName(), tableName);
|
||||
@ -404,20 +470,22 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
}
|
||||
|
||||
// Only used for sync hive metastore event
|
||||
@Override
|
||||
public boolean registerTable(TableIf tableIf) {
|
||||
long tableId = tableIf.getId();
|
||||
String tableName = tableIf.getName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("create table [{}]", tableName);
|
||||
}
|
||||
tableNameToId.put(tableName, tableId);
|
||||
idToTbl.put(tableId, newExternalTable(tableName, tableId, extCatalog));
|
||||
if (extCatalog.getUseMetaCache().get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.updateCache(tableName, (T) tableIf);
|
||||
}
|
||||
} else {
|
||||
tableNameToId.put(tableName, tableId);
|
||||
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
|
||||
}
|
||||
setLastUpdateTime(System.currentTimeMillis());
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Long, TableIf> getIdToTable() {
|
||||
return new HashMap<>(idToTbl);
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,15 +30,20 @@ import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
|
||||
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
|
||||
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
|
||||
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
|
||||
import org.apache.doris.datasource.metacache.MetaCache;
|
||||
import org.apache.doris.fs.FileSystemCache;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader;
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
@ -271,4 +276,14 @@ public class ExternalMetaCacheMgr {
|
||||
LOG.debug("invalidate partition cache for {}.{} in catalog {}", dbName, tableName, catalogId);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> MetaCache<T> buildMetaCache(String name,
|
||||
OptionalLong expireAfterWriteSec, OptionalLong refreshAfterWriteSec, long maxSize,
|
||||
CacheLoader<String, List<String>> namesCacheLoader,
|
||||
CacheLoader<String, Optional<T>> metaObjCacheLoader,
|
||||
RemovalListener<String, Optional<T>> removalListener) {
|
||||
MetaCache<T> metaCache = new MetaCache<>(name, commonRefreshExecutor, expireAfterWriteSec, refreshAfterWriteSec,
|
||||
maxSize, namesCacheLoader, metaObjCacheLoader, removalListener);
|
||||
return metaCache;
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ public class ExternalSchemaCache {
|
||||
Config.max_external_schema_cache_num,
|
||||
false,
|
||||
null);
|
||||
schemaCache = schemaCacheeFactory.buildCache(key -> loadSchema(key), executor);
|
||||
schemaCache = schemaCacheeFactory.buildCache(key -> loadSchema(key), null, executor);
|
||||
}
|
||||
|
||||
private void initMetrics() {
|
||||
|
||||
@ -3419,11 +3419,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
return newChecksum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentHashMap<Long, DatabaseIf> getIdToDb() {
|
||||
return new ConcurrentHashMap<>(idToDb);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
|
||||
return new HashSet<>(idToDb.values());
|
||||
|
||||
@ -136,14 +136,7 @@ public class EsExternalCatalog extends ExternalCatalog {
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
EsExternalDatabase db = (EsExternalDatabase) idToDb.get(dbNameToId.get(dbName));
|
||||
if (db != null && db.isInitialized()) {
|
||||
List<String> names = Lists.newArrayList();
|
||||
db.getTables().forEach(table -> names.add(table.getName()));
|
||||
return names;
|
||||
} else {
|
||||
return esRestClient.listTable(enableIncludeHiddenIndex());
|
||||
}
|
||||
return esRestClient.listTable(enableIncludeHiddenIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -38,7 +38,7 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EsExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected EsExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog);
|
||||
}
|
||||
|
||||
|
||||
@ -40,7 +40,6 @@ import org.apache.doris.fs.FileSystemProviderImpl;
|
||||
import org.apache.doris.transaction.TransactionManagerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -166,14 +165,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName));
|
||||
if (hmsExternalDatabase != null && hmsExternalDatabase.isInitialized()) {
|
||||
List<String> names = Lists.newArrayList();
|
||||
hmsExternalDatabase.getTables().forEach(table -> names.add(table.getName()));
|
||||
return names;
|
||||
} else {
|
||||
return metadataOps.listTableNames(ClusterNamespace.getNameFromFullName(dbName));
|
||||
}
|
||||
return metadataOps.listTableNames(ClusterNamespace.getNameFromFullName(dbName));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -184,7 +176,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
@Override
|
||||
public boolean tableExistInLocal(String dbName, String tblName) {
|
||||
makeSureInitialized();
|
||||
HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName));
|
||||
HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) getDbNullable(dbName);
|
||||
if (hmsExternalDatabase == null) {
|
||||
return false;
|
||||
}
|
||||
@ -201,11 +193,17 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("drop database [{}]", dbName);
|
||||
}
|
||||
Long dbId = dbNameToId.remove(dbName);
|
||||
if (dbId == null) {
|
||||
LOG.warn("drop database [{}] failed", dbName);
|
||||
if (useMetaCache.get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.invalidate(dbName);
|
||||
}
|
||||
} else {
|
||||
Long dbId = dbNameToId.remove(dbName);
|
||||
if (dbId == null) {
|
||||
LOG.warn("drop database [{}] failed", dbName);
|
||||
}
|
||||
idToDb.remove(dbId);
|
||||
}
|
||||
idToDb.remove(dbId);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(getId(), dbName);
|
||||
}
|
||||
|
||||
@ -214,9 +212,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("create database [{}]", dbName);
|
||||
}
|
||||
dbNameToId.put(dbName, dbId);
|
||||
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
|
||||
idToDb.put(dbId, db);
|
||||
|
||||
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType);
|
||||
if (useMetaCache.get()) {
|
||||
if (isInitialized()) {
|
||||
metaCache.updateCache(dbName, db);
|
||||
}
|
||||
} else {
|
||||
dbNameToId.put(dbName, dbId);
|
||||
idToDb.put(dbId, db);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -229,10 +234,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDefaultPropsWhenCreating(boolean isReplay) {
|
||||
if (isReplay) {
|
||||
return;
|
||||
}
|
||||
public void setDefaultPropsIfMissing(boolean isReplay) {
|
||||
super.setDefaultPropsIfMissing(isReplay);
|
||||
if (catalogProperty.getOrDefault(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "").isEmpty()) {
|
||||
// always allow fallback to simple auth, so to support both kerberos and simple auth
|
||||
catalogProperty.addProperty(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true");
|
||||
|
||||
@ -38,7 +38,7 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HMSExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected HMSExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog);
|
||||
}
|
||||
|
||||
|
||||
@ -142,7 +142,8 @@ public class HiveMetaStoreCache {
|
||||
Config.max_hive_table_cache_num,
|
||||
false,
|
||||
null);
|
||||
partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), refreshExecutor);
|
||||
partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), null,
|
||||
refreshExecutor);
|
||||
|
||||
CacheFactory partitionCacheFactory = new CacheFactory(
|
||||
OptionalLong.of(86400L),
|
||||
@ -160,7 +161,7 @@ public class HiveMetaStoreCache {
|
||||
public Map<PartitionCacheKey, HivePartition> loadAll(Iterable<? extends PartitionCacheKey> keys) {
|
||||
return loadPartitions(keys);
|
||||
}
|
||||
}, refreshExecutor);
|
||||
}, null, refreshExecutor);
|
||||
|
||||
setNewFileCache();
|
||||
}
|
||||
@ -198,7 +199,7 @@ public class HiveMetaStoreCache {
|
||||
|
||||
LoadingCache<FileCacheKey, FileCacheValue> oldFileCache = fileCacheRef.get();
|
||||
|
||||
fileCacheRef.set(fileCacheFactory.buildCache(loader, this.refreshExecutor));
|
||||
fileCacheRef.set(fileCacheFactory.buildCache(loader, null, this.refreshExecutor));
|
||||
if (Objects.nonNull(oldFileCache)) {
|
||||
oldFileCache.invalidateAll();
|
||||
}
|
||||
|
||||
@ -158,7 +158,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
|
||||
//The scope of refresh can be narrowed in the future
|
||||
Env.getCurrentEnv().getRefreshManager()
|
||||
.refreshExternalTableFromEvent(catalogName, tableBefore.getDbName(), tableBefore.getTableName(),
|
||||
eventTime, true);
|
||||
eventTime);
|
||||
} catch (Exception e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -84,7 +84,7 @@ public class InsertEvent extends MetastoreTableEvent {
|
||||
* but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it.
|
||||
*/
|
||||
Env.getCurrentEnv().getRefreshManager().refreshExternalTableFromEvent(catalogName, dbName, tblName,
|
||||
eventTime, true);
|
||||
eventTime);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -56,7 +56,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
Config.max_hive_table_cache_num,
|
||||
false,
|
||||
null);
|
||||
this.partitionCache = partitionCacheFactory.buildCache(key -> new TablePartitionValues(), executor);
|
||||
this.partitionCache = partitionCacheFactory.buildCache(key -> new TablePartitionValues(), null, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -45,11 +45,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() {
|
||||
super.init();
|
||||
}
|
||||
|
||||
// Create catalog based on catalog type
|
||||
protected abstract void initCatalog();
|
||||
|
||||
|
||||
@ -28,7 +28,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IcebergExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected IcebergExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new IcebergExternalTable(tblId, tableName, name, (IcebergExternalCatalog) extCatalog);
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ public class IcebergMetadataCache {
|
||||
Config.max_hive_table_cache_num,
|
||||
false,
|
||||
null);
|
||||
this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), executor);
|
||||
this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), null, executor);
|
||||
|
||||
CacheFactory tableCacheFactory = new CacheFactory(
|
||||
OptionalLong.of(86400L),
|
||||
@ -67,7 +67,7 @@ public class IcebergMetadataCache {
|
||||
Config.max_hive_table_cache_num,
|
||||
false,
|
||||
null);
|
||||
this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), executor);
|
||||
this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), null, executor);
|
||||
}
|
||||
|
||||
public List<Snapshot> getSnapshotList(TIcebergMetadataParams params) throws UserException {
|
||||
|
||||
@ -104,13 +104,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
|
||||
|
||||
@Override
|
||||
public void dropDb(DropDbStmt stmt) throws DdlException {
|
||||
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
|
||||
String dbName = stmt.getDbName();
|
||||
if (dorisCatalog.getDbNameToId().containsKey(dbName)) {
|
||||
Long aLong = dorisCatalog.getDbNameToId().get(dbName);
|
||||
dorisCatalog.getIdToDb().remove(aLong);
|
||||
dorisCatalog.getDbNameToId().remove(dbName);
|
||||
}
|
||||
if (!databaseExist(dbName)) {
|
||||
if (stmt.isSetIfExists()) {
|
||||
LOG.info("drop database[{}] which does not exist", dbName);
|
||||
@ -119,6 +113,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
|
||||
}
|
||||
}
|
||||
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
|
||||
nsCatalog.dropNamespace(Namespace.of(dbName));
|
||||
dorisCatalog.onRefresh(true);
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ public class ExternalInfoSchemaDatabase extends ExternalDatabase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected ExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new ExternalInfoSchemaTable(tblId, tableName, catalog);
|
||||
}
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ public class ExternalMysqlDatabase extends ExternalDatabase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected ExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new ExternalMysqlTable(tblId, tableName, catalog);
|
||||
}
|
||||
|
||||
|
||||
@ -248,14 +248,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
JdbcExternalDatabase db = (JdbcExternalDatabase) idToDb.get(dbNameToId.get(dbName));
|
||||
if (db != null && db.isInitialized()) {
|
||||
List<String> names = Lists.newArrayList();
|
||||
db.getTables().forEach(table -> names.add(table.getName()));
|
||||
return names;
|
||||
} else {
|
||||
return jdbcClient.getTablesNameList(dbName);
|
||||
}
|
||||
return jdbcClient.getTablesNameList(dbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -265,10 +258,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDefaultPropsWhenCreating(boolean isReplay) throws DdlException {
|
||||
if (isReplay) {
|
||||
return;
|
||||
}
|
||||
public void checkWhenCreating() throws DdlException {
|
||||
super.checkWhenCreating();
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
if (properties.containsKey(JdbcResource.DRIVER_URL)) {
|
||||
String computedChecksum = JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL));
|
||||
|
||||
@ -35,7 +35,7 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JdbcExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected JdbcExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog);
|
||||
}
|
||||
|
||||
|
||||
@ -37,7 +37,7 @@ public class MaxComputeExternalDatabase extends ExternalDatabase<MaxComputeExter
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MaxComputeExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected MaxComputeExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new MaxComputeExternalTable(tblId, tableName, name, (MaxComputeExternalCatalog) extCatalog);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,111 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.metacache;
|
||||
|
||||
import org.apache.doris.common.CacheFactory;
|
||||
import org.apache.doris.common.util.Util;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class MetaCache<T> {
|
||||
private LoadingCache<String, List<String>> namesCache;
|
||||
private Map<Long, String> idToName = Maps.newConcurrentMap();
|
||||
private LoadingCache<String, Optional<T>> metaObjCache;
|
||||
|
||||
private String name;
|
||||
|
||||
public MetaCache(String name,
|
||||
ExecutorService executor,
|
||||
OptionalLong expireAfterWriteSec,
|
||||
OptionalLong refreshAfterWriteSec,
|
||||
long maxSize,
|
||||
CacheLoader<String, List<String>> namesCacheLoader,
|
||||
CacheLoader<String, Optional<T>> metaObjCacheLoader,
|
||||
RemovalListener<String, Optional<T>> removalListener) {
|
||||
this.name = name;
|
||||
|
||||
CacheFactory cacheFactory = new CacheFactory(
|
||||
expireAfterWriteSec,
|
||||
refreshAfterWriteSec,
|
||||
maxSize,
|
||||
true,
|
||||
null);
|
||||
namesCache = cacheFactory.buildCache(namesCacheLoader, null, executor);
|
||||
metaObjCache = cacheFactory.buildCache(metaObjCacheLoader, removalListener, executor);
|
||||
}
|
||||
|
||||
public List<String> listNames() {
|
||||
return namesCache.get("");
|
||||
}
|
||||
|
||||
public Optional<T> getMetaObj(String name) {
|
||||
Optional<T> val = metaObjCache.getIfPresent(name);
|
||||
if (val == null) {
|
||||
synchronized (metaObjCache) {
|
||||
val = metaObjCache.get(name);
|
||||
idToName.put(Util.genTableIdByName(name), name);
|
||||
}
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
public Optional<T> getMetaObjById(long id) {
|
||||
String name = idToName.get(id);
|
||||
return name == null ? Optional.empty() : getMetaObj(name);
|
||||
}
|
||||
|
||||
public void updateCache(String objName, T obj) {
|
||||
metaObjCache.put(objName, Optional.of(obj));
|
||||
namesCache.asMap().compute("", (k, v) -> {
|
||||
if (v == null) {
|
||||
return Lists.newArrayList(objName);
|
||||
} else {
|
||||
v.add(objName);
|
||||
return v;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void invalidate(String objName) {
|
||||
namesCache.asMap().compute("", (k, v) -> {
|
||||
if (v == null) {
|
||||
return Lists.newArrayList();
|
||||
} else {
|
||||
v.remove(objName);
|
||||
return v;
|
||||
}
|
||||
});
|
||||
metaObjCache.invalidate(objName);
|
||||
}
|
||||
|
||||
public void invalidateAll() {
|
||||
namesCache.invalidateAll();
|
||||
metaObjCache.invalidateAll();
|
||||
}
|
||||
|
||||
}
|
||||
@ -28,7 +28,7 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PaimonExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected PaimonExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog);
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -44,9 +45,14 @@ public class TestExternalCatalog extends ExternalCatalog {
|
||||
String comment) {
|
||||
super(catalogId, name, InitCatalogLog.Type.TEST, comment);
|
||||
this.catalogProperty = new CatalogProperty(resource, props);
|
||||
initCatalogProvider();
|
||||
}
|
||||
|
||||
private void initCatalogProvider() {
|
||||
String providerClass = this.catalogProperty.getProperties().get("catalog_provider.class");
|
||||
Class<?> providerClazz = null;
|
||||
try {
|
||||
providerClazz = Class.forName(props.get("catalog_provider.class"));
|
||||
providerClazz = Class.forName(providerClass);
|
||||
this.catalogProvider = (TestCatalogProvider) providerClazz.newInstance();
|
||||
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -81,14 +87,7 @@ public class TestExternalCatalog extends ExternalCatalog {
|
||||
@Override
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
TestExternalDatabase db = (TestExternalDatabase) idToDb.get(dbNameToId.get(dbName));
|
||||
if (db != null && db.isInitialized()) {
|
||||
List<String> names = Lists.newArrayList();
|
||||
db.getTables().stream().forEach(table -> names.add(table.getName()));
|
||||
return names;
|
||||
} else {
|
||||
return mockedTableNames(dbName);
|
||||
}
|
||||
return mockedTableNames(dbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -107,4 +106,11 @@ public class TestExternalCatalog extends ExternalCatalog {
|
||||
// db name -> (tbl name -> schema)
|
||||
Map<String, Map<String, List<Column>>> getMetadata();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {
|
||||
super.gsonPostProcess();
|
||||
initCatalogProvider();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -28,7 +28,7 @@ public class TestExternalDatabase extends ExternalDatabase<TestExternalTable> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TestExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
protected TestExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
|
||||
return new TestExternalTable(tblId, tableName, name, (TestExternalCatalog) extCatalog);
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,6 +59,8 @@ public final class GlobalVariable {
|
||||
public static final String ENABLE_GET_ROW_COUNT_FROM_FILE_LIST = "enable_get_row_count_from_file_list";
|
||||
public static final String READ_ONLY = "read_only";
|
||||
public static final String SUPER_READ_ONLY = "super_read_only";
|
||||
public static final String DEFAULT_USING_META_CACHE_FOR_EXTERNAL_CATALOG
|
||||
= "default_using_meta_cache_for_external_catalog";
|
||||
|
||||
@VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY)
|
||||
public static String versionComment = "Doris version "
|
||||
|
||||
@ -583,14 +583,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
try {
|
||||
List<TableIf> tables;
|
||||
if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) {
|
||||
tables = db.getTablesOrEmpty();
|
||||
tables = db.getTablesIgnoreException();
|
||||
} else {
|
||||
switch (params.getType()) {
|
||||
case "VIEW":
|
||||
tables = db.getViewsOrEmpty();
|
||||
break;
|
||||
default:
|
||||
tables = db.getTablesOrEmpty();
|
||||
tables = db.getTablesIgnoreException();
|
||||
}
|
||||
}
|
||||
for (TableIf table : tables) {
|
||||
@ -1968,7 +1968,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
throw new UserException("unknown database, database=" + dbName);
|
||||
}
|
||||
// todo Whether there will be a large amount of data risk
|
||||
List<Table> tables = db.getTablesOrEmpty();
|
||||
List<Table> tables = db.getTablesIgnoreException();
|
||||
if (CollectionUtils.isEmpty(tables)) {
|
||||
throw new MetaNotFoundException("table not found");
|
||||
}
|
||||
|
||||
@ -127,16 +127,20 @@ public class StatisticsCleaner extends MasterDaemon {
|
||||
|
||||
private Map<Long, DatabaseIf> constructDbMap() {
|
||||
Map<Long, DatabaseIf> idToDb = Maps.newHashMap();
|
||||
for (CatalogIf ctl : idToCatalog.values()) {
|
||||
idToDb.putAll(ctl.getIdToDb());
|
||||
for (CatalogIf<? extends DatabaseIf> ctl : idToCatalog.values()) {
|
||||
for (DatabaseIf db : ctl.getAllDbs()) {
|
||||
idToDb.put(db.getId(), db);
|
||||
}
|
||||
}
|
||||
return idToDb;
|
||||
}
|
||||
|
||||
private Map<Long, TableIf> constructTblMap() {
|
||||
Map<Long, TableIf> idToTbl = new HashMap<>();
|
||||
for (DatabaseIf db : idToDb.values()) {
|
||||
idToTbl.putAll(db.getIdToTable());
|
||||
for (DatabaseIf<? extends TableIf> db : idToDb.values()) {
|
||||
for (TableIf tbl : db.getTables()) {
|
||||
idToTbl.put(tbl.getId(), tbl);
|
||||
}
|
||||
}
|
||||
return idToTbl;
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ public class DataBaseStats {
|
||||
public Map<String, Map> getStats(boolean summary) {
|
||||
Map<String, Map> stat = new HashMap<>();
|
||||
Map<String, Map> dstat = new HashMap<>();
|
||||
List<TableIf> tables = db.getTablesOrEmpty();
|
||||
List<TableIf> tables = db.getTablesIgnoreException();
|
||||
stat.put("summary", ImmutableMap.of("query", getQueryStats()));
|
||||
|
||||
for (TableIf table : tables) {
|
||||
|
||||
Reference in New Issue
Block a user