[feature-wip](multi-catalog)(step1)support connect to max compute (#19606)

Issue Number: #19679

support connect to max compute metadata by odps sdk
This commit is contained in:
slothever
2023-05-16 11:30:27 +08:00
committed by GitHub
parent 9cd7005dec
commit 3f2d1ae9a4
30 changed files with 715 additions and 872 deletions

View File

@ -134,7 +134,7 @@ public interface TableIf {
enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE;
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {

View File

@ -17,35 +17,15 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Elasticsearch metastore external database.
*/
public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
private Map<Long, EsExternalTable> idToTbl = Maps.newConcurrentMap();
/**
* Create Elasticsearch external database.
@ -55,106 +35,12 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implem
* @param name database name.
*/
public EsExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name);
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
EsExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
EsExternalTable table = new EsExternalTable(log.getCreateTableIds().get(i),
log.getCreateTableNames().get(i), name, (EsExternalCatalog) catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (EsExternalTable table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
super(extCatalog, id, name, InitDatabaseLog.Type.ES);
}
@Override
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.ES);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, EsExternalTable> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
EsExternalTable table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
EsExternalTable table = new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public List<EsExternalTable> getTables() {
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
}
@Override
public EsExternalTable getTableNullable(String tableName) {
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public EsExternalTable getTableNullable(long tableId) {
makeSureInitialized();
return idToTbl.get(tableId);
}
public EsExternalTable getTableForReplay(long tableId) {
return idToTbl.get(tableId);
}
@Override
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (EsExternalTable tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
protected EsExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog);
}
public void addTableForTest(EsExternalTable tbl) {

View File

@ -26,9 +26,6 @@ import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
@ -36,7 +33,6 @@ import java.util.List;
*/
public class EsExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(EsExternalTable.class);
private EsTable esTable;
/**

View File

@ -32,6 +32,9 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
@ -41,6 +44,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -50,8 +54,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* @param <T> External table type is ExternalTable or its subclass.
*/
public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, Writable, GsonPostProcessable {
public abstract class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class);
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
@ -64,16 +68,14 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
protected DatabaseProperty dbProperties = new DatabaseProperty();
@SerializedName(value = "initialized")
protected boolean initialized = false;
// Cache of table name to table id.
protected Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
protected Map<Long, T> idToTbl = Maps.newConcurrentMap();
protected final InitDatabaseLog.Type dbLogType;
protected ExternalCatalog extCatalog;
protected boolean invalidCacheInInit = true;
/**
* No args constructor for persist.
*/
public ExternalDatabase() {
initialized = false;
}
/**
* Create external database.
*
@ -81,10 +83,11 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
* @param id Database id.
* @param name Database name.
*/
public ExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
public ExternalDatabase(ExternalCatalog extCatalog, long id, String name, InitDatabaseLog.Type dbLogType) {
this.extCatalog = extCatalog;
this.id = id;
this.name = name;
this.dbLogType = dbLogType;
}
public void setExtCatalog(ExternalCatalog extCatalog) {
@ -92,6 +95,9 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (T table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
}
public void setUnInitialized(boolean invalidCache) {
@ -125,16 +131,61 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
}
}
protected void init() {
throw new NotImplementedException("init() is not implemented");
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
T table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(dbLogType);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, T> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
T table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
T table = getExternalTable(tableName, tblId, extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
protected abstract T getExternalTable(String tableName, long tblId, ExternalCatalog catalog);
public T getTableForReplay(long tableId) {
throw new NotImplementedException("getTableForReplay() is not implemented");
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
throw new NotImplementedException("replayInitDb() is not implemented");
return idToTbl.get(tableId);
}
@Override
@ -210,7 +261,8 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
@Override
public List<T> getTables() {
throw new NotImplementedException("getTables() is not implemented");
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
}
@Override
@ -235,17 +287,23 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
@Override
public Set<String> getTableNamesWithLock() {
throw new NotImplementedException("getTableNamesWithLock() is not implemented");
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public T getTableNullable(String tableName) {
throw new NotImplementedException("getTableNullable() is not implemented");
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public T getTableNullable(long tableId) {
throw new NotImplementedException("getTableNullable() is not implemented");
makeSureInitialized();
return idToTbl.get(tableId);
}
@Override
@ -253,13 +311,20 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@SuppressWarnings("rawtypes")
public static ExternalDatabase read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ExternalDatabase.class);
}
@Override
public void gsonPostProcess() throws IOException {}
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (T tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
}
@Override
public void dropTable(String tableName) {

View File

@ -67,7 +67,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@SerializedName(value = "dbName")
protected String dbName;
protected boolean objectCreated = false;
protected boolean objectCreated;
protected ExternalCatalog catalog;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);

View File

@ -17,26 +17,17 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -45,11 +36,6 @@ import java.util.stream.Collectors;
public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(HMSExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
private Map<Long, HMSExternalTable> idToTbl = Maps.newConcurrentMap();
/**
* Create HMS external database.
*
@ -58,73 +44,12 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
* @param name database name.
*/
public HMSExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name);
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
HMSExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
HMSExternalTable table = new HMSExternalTable(log.getCreateTableIds().get(i),
log.getCreateTableNames().get(i), name, (HMSExternalCatalog) catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (HMSExternalTable table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
super(extCatalog, id, name, InitDatabaseLog.Type.HMS);
}
@Override
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
HMSExternalTable table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
HMSExternalTable table = new HMSExternalTable(tblId, tableName, name,
(HMSExternalCatalog) extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
@Override
public List<HMSExternalTable> getTables() {
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
protected HMSExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog);
}
@Override
@ -133,40 +58,6 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
}
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public HMSExternalTable getTableNullable(String tableName) {
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public HMSExternalTable getTableNullable(long tableId) {
makeSureInitialized();
return idToTbl.get(tableId);
}
public HMSExternalTable getTableForReplay(long tableId) {
return idToTbl.get(tableId);
}
@Override
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (HMSExternalTable tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
}
public void addTableForTest(HMSExternalTable tbl) {
idToTbl.put(tbl.getId(), tbl);
tableNameToId.put(tbl.getName(), tbl.getId());
@ -188,8 +79,7 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
LOG.debug("create table [{}]", tableName);
makeSureInitialized();
tableNameToId.put(tableName, tableId);
HMSExternalTable table = new HMSExternalTable(tableId, tableName, name,
(HMSExternalCatalog) extCatalog);
HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog);
idToTbl.put(tableId, table);
}
}

View File

@ -17,104 +17,30 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTable> implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(IcebergExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
private Map<Long, IcebergExternalTable> idToTbl = Maps.newConcurrentMap();
public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String name) {
super(extCatalog, id, name);
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
IcebergExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
IcebergExternalTable table = new IcebergExternalTable(log.getCreateTableIds().get(i),
log.getCreateTableNames().get(i), name, (IcebergExternalCatalog) catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (IcebergExternalTable table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
super(extCatalog, id, name, InitDatabaseLog.Type.ICEBERG);
}
@Override
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
IcebergExternalTable table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
IcebergExternalTable table = new IcebergExternalTable(tblId, tableName, name,
(IcebergExternalCatalog) extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
@Override
public List<IcebergExternalTable> getTables() {
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
protected IcebergExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new IcebergExternalTable(tblId, tableName, name, (IcebergExternalCatalog) extCatalog);
}
@Override
@ -123,41 +49,6 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
}
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public IcebergExternalTable getTableNullable(String tableName) {
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public IcebergExternalTable getTableNullable(long tableId) {
makeSureInitialized();
return idToTbl.get(tableId);
}
@Override
public IcebergExternalTable getTableForReplay(long tableId) {
return idToTbl.get(tableId);
}
@Override
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (IcebergExternalTable tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
}
@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);

View File

@ -17,32 +17,12 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(JdbcExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
private Map<Long, JdbcExternalTable> idToTbl = Maps.newConcurrentMap();
/**
* Create Jdbc external database.
@ -52,107 +32,12 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> im
* @param name database name.
*/
public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name);
super(extCatalog, id, name, InitDatabaseLog.Type.JDBC);
}
@Override
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.JDBC);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, JdbcExternalTable> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
JdbcExternalTable table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
JdbcExternalTable table = new JdbcExternalTable(tblId, tableName, name,
(JdbcExternalCatalog) extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (JdbcExternalTable table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, JdbcExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
JdbcExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
JdbcExternalTable table = new JdbcExternalTable(log.getCreateTableIds().get(i),
log.getCreateTableNames().get(i), name, (JdbcExternalCatalog) catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public List<JdbcExternalTable> getTables() {
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
}
@Override
public JdbcExternalTable getTableNullable(String tableName) {
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public JdbcExternalTable getTableNullable(long tableId) {
makeSureInitialized();
return idToTbl.get(tableId);
}
public JdbcExternalTable getTableForReplay(long tableId) {
return idToTbl.get(tableId);
}
@Override
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (JdbcExternalTable tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
protected JdbcExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog);
}
public void addTableForTest(JdbcExternalTable tbl) {

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.external;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
/**
* MaxCompute external database.
*/
public class MaxComputeExternalDatabase extends ExternalDatabase<MaxComputeExternalTable>
implements GsonPostProcessable {
/**
* Create MaxCompute external database.
*
* @param extCatalog External catalog this database belongs to.
* @param id database id.
* @param name database name.
*/
public MaxComputeExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name, InitDatabaseLog.Type.MAX_COMPUTE);
}
@Override
protected MaxComputeExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new MaxComputeExternalTable(tblId, tableName, name, (MaxComputeExternalCatalog) extCatalog);
}
}

View File

@ -0,0 +1,176 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
/**
* MaxCompute external table.
*/
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
}
@Override
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
objectCreated = true;
}
}
@Override
public List<Column> initSchema() {
makeSureInitialized();
List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();
List<Column> result = Lists.newArrayListWithCapacity(columns.size());
for (com.aliyun.odps.Column field : columns) {
result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
return result;
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
OdpsType odpsType = typeInfo.getOdpsType();
switch (odpsType) {
case VOID: {
return Type.NULL;
}
case BOOLEAN: {
return Type.BOOLEAN;
}
case TINYINT: {
return Type.TINYINT;
}
case SMALLINT: {
return Type.SMALLINT;
}
case INT: {
return Type.INT;
}
case BIGINT: {
return Type.BIGINT;
}
case CHAR: {
return Type.CHAR;
}
case VARCHAR: {
return Type.VARCHAR;
}
case STRING: {
return Type.STRING;
}
case JSON: {
return Type.UNSUPPORTED;
// return Type.JSONB;
}
case FLOAT: {
return Type.FLOAT;
}
case DOUBLE: {
return Type.DOUBLE;
}
case DECIMAL: {
DecimalTypeInfo decimal = (DecimalTypeInfo) typeInfo;
return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
}
case DATE: {
return ScalarType.createDateV2Type();
}
case DATETIME:
case TIMESTAMP: {
return ScalarType.createDatetimeV2Type(3);
}
case ARRAY: {
ArrayTypeInfo arrayType = (ArrayTypeInfo) typeInfo;
Type innerType = mcTypeToDorisType(arrayType.getElementTypeInfo());
return ArrayType.create(innerType, true);
}
case MAP: {
MapTypeInfo mapType = (MapTypeInfo) typeInfo;
return new MapType(mcTypeToDorisType(mapType.getKeyTypeInfo()),
mcTypeToDorisType(mapType.getValueTypeInfo()));
}
case STRUCT: {
ArrayList<StructField> fields = new ArrayList<>();
StructTypeInfo structType = (StructTypeInfo) typeInfo;
List<String> fieldNames = structType.getFieldNames();
List<TypeInfo> fieldTypeInfos = structType.getFieldTypeInfos();
for (int i = 0; i < structType.getFieldCount(); i++) {
Type innerType = mcTypeToDorisType(fieldTypeInfos.get(i));
fields.add(new StructField(fieldNames.get(i), innerType));
}
return new StructType(fields);
}
case BINARY:
case INTERVAL_DAY_TIME:
case INTERVAL_YEAR_MONTH:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + odpsType);
}
}
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
TMCTable tMcTable = new TMCTable();
tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) catalog).getTunnelUrl());
// use mc project as dbName
tMcTable.setProject(dbName);
tMcTable.setTable(name);
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE,
schema.size(), 0, getName(), dbName);
tTableDescriptor.setMcTable(tMcTable);
return tTableDescriptor;
}
@Override
public String getMysqlType() {
return "BASE TABLE";
}
}

View File

@ -17,134 +17,19 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.test.TestExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestExternalDatabase extends ExternalDatabase<TestExternalTable> implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(TestExternalDatabase.class);
// Cache of table name to table id.
private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
@SerializedName(value = "idToTbl")
private Map<Long, TestExternalTable> idToTbl = Maps.newConcurrentMap();
public TestExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name);
super(extCatalog, id, name, InitDatabaseLog.Type.TEST);
}
@Override
protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.TEST);
initDatabaseLog.setCatalogId(extCatalog.getId());
initDatabaseLog.setDbId(id);
List<String> tableNames = extCatalog.listTableNames(null, name);
if (tableNames != null) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, TestExternalTable> tmpIdToTbl = Maps.newHashMap();
for (String tableName : tableNames) {
long tblId;
if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
TestExternalTable table = idToTbl.get(tblId);
table.unsetObjectCreated();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
tblId = Env.getCurrentEnv().getNextId();
tmpTableNameToId.put(tableName, tblId);
TestExternalTable table = new TestExternalTable(tblId, tableName, name,
(TestExternalCatalog) extCatalog);
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addCreateTable(tblId, tableName);
}
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
}
initialized = true;
Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
for (TestExternalTable table : idToTbl.values()) {
table.setCatalog(extCatalog);
}
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
Map<Long, TestExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
TestExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
for (int i = 0; i < log.getCreateCount(); i++) {
TestExternalTable table = new TestExternalTable(log.getCreateTableIds().get(i),
log.getCreateTableNames().get(i), name, (TestExternalCatalog) catalog);
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
tableNameToId = tmpTableNameToId;
idToTbl = tmpIdToTbl;
initialized = true;
}
@Override
public Set<String> getTableNamesWithLock() {
makeSureInitialized();
return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public List<TestExternalTable> getTables() {
makeSureInitialized();
return Lists.newArrayList(idToTbl.values());
}
@Override
public TestExternalTable getTableNullable(String tableName) {
makeSureInitialized();
if (!tableNameToId.containsKey(tableName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public TestExternalTable getTableNullable(long tableId) {
makeSureInitialized();
return idToTbl.get(tableId);
}
public TestExternalTable getTableForReplay(long tableId) {
return idToTbl.get(tableId);
}
@Override
public void gsonPostProcess() throws IOException {
tableNameToId = Maps.newConcurrentMap();
for (TestExternalTable tbl : idToTbl.values()) {
tableNameToId.put(tbl.getName(), tbl.getId());
}
rwLock = new ReentrantReadWriteLock(true);
protected TestExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new TestExternalTable(tblId, tableName, name, (TestExternalCatalog) extCatalog);
}
}

View File

@ -113,6 +113,9 @@ public class CatalogFactory {
case "iceberg":
catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props);
break;
case "max_compute":
catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props);
break;
case "test":
if (!FeConstants.runningUnitTest) {
throw new DdlException("test catalog is only for FE unit test");

View File

@ -20,6 +20,8 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.external.elasticsearch.EsRestClient;
import com.google.common.collect.Lists;
@ -29,7 +31,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -47,7 +48,7 @@ public class EsExternalCatalog extends ExternalCatalog {
* Default constructor for EsExternalCatalog.
*/
public EsExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
super(catalogId, name, InitCatalogLog.Type.ES);
this.type = "es";
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
}
@ -124,7 +125,7 @@ public class EsExternalCatalog extends ExternalCatalog {
protected void init() {
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.ES);
initCatalogLog.setType(logType);
if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) {
idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(invalidCacheInInit);
initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB));
@ -133,19 +134,13 @@ public class EsExternalCatalog extends ExternalCatalog {
idToDb = Maps.newConcurrentMap();
long defaultDbId = Env.getCurrentEnv().getNextId();
dbNameToId.put(DEFAULT_DB, defaultDbId);
EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId, DEFAULT_DB);
ExternalDatabase<? extends ExternalTable> db = getDbForInit(DEFAULT_DB, defaultDbId, logType);
idToDb.put(defaultDbId, db);
initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB);
}
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return new ArrayList<>(dbNameToId.keySet());
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();

View File

@ -26,6 +26,8 @@ import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
@ -51,6 +53,7 @@ import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -59,7 +62,8 @@ import java.util.Optional;
* The abstract class for all types of external catalogs.
*/
@Data
public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Writable, GsonPostProcessable {
public abstract class ExternalCatalog
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
// Unique id of this catalog, will be assigned after catalog is loaded.
@ -69,13 +73,15 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
protected String name;
@SerializedName(value = "type")
protected String type;
// TODO: replace type with log type
protected final InitCatalogLog.Type logType;
// save properties of this catalog, such as hive meta store url.
@SerializedName(value = "catalogProperty")
protected CatalogProperty catalogProperty;
@SerializedName(value = "initialized")
private boolean initialized = false;
@SerializedName(value = "idToDb")
protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
protected Map<Long, ExternalDatabase<? extends ExternalTable>> idToDb = Maps.newConcurrentMap();
// db name does not contains "default_cluster"
protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
@ -84,15 +90,25 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
private ExternalSchemaCache schemaCache;
private String comment;
public ExternalCatalog(long catalogId, String name) {
public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType) {
this.id = catalogId;
this.name = name;
this.logType = logType;
}
protected List<String> listDatabaseNames() {
throw new UnsupportedOperationException("Unsupported operation: "
+ "listDatabaseNames from remote client when init catalog with " + logType.name());
}
/**
* @return names of database in this catalog.
*/
public abstract List<String> listDatabaseNames(SessionContext ctx);
// public abstract List<String> listDatabaseNames(SessionContext ctx);
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return new ArrayList<>(dbNameToId.keySet());
}
/**
* @param dbName
@ -204,7 +220,43 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
}
// init schema related objects
protected abstract void init();
protected void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(logType);
List<String> allDatabases = listDatabaseNames();
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
for (String dbName : allDatabases) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
continue;
}
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase<? extends ExternalTable> db = idToDb.get(dbId);
db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
public void setUninitialized(boolean invalidCache) {
this.objectCreated = false;
@ -219,17 +271,13 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
}
public ExternalDatabase getDbForReplay(long dbId) {
return idToDb.get(dbId);
}
public final List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
Optional<ExternalDatabase> db = getDb(dbName);
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
if (db.isPresent()) {
Optional table = db.get().getTable(tblName);
Optional<? extends ExternalTable> table = db.get().getTable(tblName);
if (table.isPresent()) {
return ((ExternalTable) table.get()).initSchema();
return table.get().initSchema();
}
}
// return one column with unsupported type.
@ -287,7 +335,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
@Nullable
@Override
public ExternalDatabase getDbNullable(String dbName) {
public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) {
try {
makeSureInitialized();
} catch (Exception e) {
@ -303,7 +351,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
@Nullable
@Override
public ExternalDatabase getDbNullable(long dbId) {
public ExternalDatabase<? extends ExternalTable> getDbNullable(long dbId) {
try {
makeSureInitialized();
} catch (Exception e) {
@ -358,54 +406,51 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
public void replayInitCatalog(InitCatalogLog log) {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i));
ExternalDatabase<? extends ExternalTable> db = getDbForReplay(log.getRefreshDbIds().get(i));
db.setUnInitialized(invalidCacheInInit);
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
switch (log.getType()) {
case HMS:
for (int i = 0; i < log.getCreateCount(); i++) {
HMSExternalDatabase db = new HMSExternalDatabase(
this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
break;
case ES:
for (int i = 0; i < log.getCreateCount(); i++) {
EsExternalDatabase db = new EsExternalDatabase(
this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
break;
case JDBC:
for (int i = 0; i < log.getCreateCount(); i++) {
JdbcExternalDatabase db = new JdbcExternalDatabase(
this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
break;
case ICEBERG:
for (int i = 0; i < log.getCreateCount(); i++) {
IcebergExternalDatabase db = new IcebergExternalDatabase(
this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
break;
default:
break;
for (int i = 0; i < log.getCreateCount(); i++) {
ExternalDatabase<? extends ExternalTable> db =
getDbForInit(log.getCreateDbNames().get(i), log.getCreateDbIds().get(i), log.getType());
if (db != null) {
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
initialized = true;
}
public ExternalDatabase<? extends ExternalTable> getDbForReplay(long dbId) {
return idToDb.get(dbId);
}
protected ExternalDatabase<? extends ExternalTable> getDbForInit(String dbName, long dbId,
InitCatalogLog.Type logType) {
switch (logType) {
case HMS:
return new HMSExternalDatabase(this, dbId, dbName);
case ES:
return new EsExternalDatabase(this, dbId, dbName);
case JDBC:
return new JdbcExternalDatabase(this, dbId, dbName);
case ICEBERG:
return new IcebergExternalDatabase(this, dbId, dbName);
case MAX_COMPUTE:
return new MaxComputeExternalDatabase(this, dbId, dbName);
case TEST:
return new TestExternalDatabase(this, dbId, dbName);
default:
break;
}
return null;
}
/**
* External catalog has no cluster semantics.
*/
@ -425,7 +470,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
idToDb = Maps.newConcurrentMap();
}
dbNameToId = Maps.newConcurrentMap();
for (ExternalDatabase db : idToDb.values()) {
for (ExternalDatabase<? extends ExternalTable> db : idToDb.values()) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
db.setExtCatalog(this);
db.setTableExtCatalog(this);
@ -436,7 +481,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
}
}
public void addDatabaseForTest(ExternalDatabase db) {
public void addDatabaseForTest(ExternalDatabase<? extends ExternalTable> db) {
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
@ -449,15 +494,15 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
throw new NotImplementedException("createDatabase not implemented");
}
public Map getIncludeDatabaseMap() {
public Map<String, Boolean> getIncludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.INCLUDE_DATABASE_LIST);
}
public Map getExcludeDatabaseMap() {
public Map<String, Boolean> getExcludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.EXCLUDE_DATABASE_LIST);
}
public Map getSpecifiedDatabaseMap(String catalogPropertyKey) {
public Map<String, Boolean> getSpecifiedDatabaseMap(String catalogPropertyKey) {
String specifiedDatabaseList = catalogProperty.getOrDefault(catalogPropertyKey, "");
Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
specifiedDatabaseList = specifiedDatabaseList.trim();
@ -465,8 +510,8 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
return specifiedDatabaseMap;
}
String[] databaseList = specifiedDatabaseList.split(",");
for (int i = 0; i < databaseList.length; i++) {
String dbname = databaseList[i].trim();
for (String database : databaseList) {
String dbname = database.trim();
if (!dbname.isEmpty()) {
specifiedDatabaseMap.put(dbname, true);
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -31,7 +32,6 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@ -70,7 +70,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
* Default constructor for HMSExternalCatalog.
*/
public HMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
super(catalogId, name, InitCatalogLog.Type.HMS);
this.type = "hms";
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
@ -122,44 +122,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
}
@Override
protected void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.HMS);
List<String> allDatabases = client.getAllDatabases();
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
// Update the db name to id map.
for (String dbName : allDatabases) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
continue;
}
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
protected List<String> listDatabaseNames() {
return client.getAllDatabases();
}
@Override
@ -193,12 +157,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE);
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return Lists.newArrayList(dbNameToId.keySet());
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
@ -292,7 +250,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
makeSureInitialized();
LOG.debug("create database [{}]", dbName);
dbNameToId.put(dbName, dbId);
HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
idToDb.put(dbId, db);
}

View File

@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable {
ES,
JDBC,
ICEBERG,
MAX_COMPUTE,
TEST,
UNKNOWN;
}

View File

@ -34,8 +34,10 @@ import java.util.List;
public class InitDatabaseLog implements Writable {
public enum Type {
HMS,
ICEBERG,
ES,
JDBC,
MAX_COMPUTE,
TEST,
UNKNOWN;
}

View File

@ -17,9 +17,7 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.common.DdlException;
import org.apache.doris.external.jdbc.JdbcClient;
@ -51,7 +49,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props)
throws DdlException {
super(catalogId, name);
super(catalogId, name, InitCatalogLog.Type.JDBC);
this.type = "jdbc";
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
}
@ -140,40 +138,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap());
}
@Override
protected void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.JDBC);
List<String> allDatabaseNames = jdbcClient.getDatabaseNameList();
for (String dbName : allDatabaseNames) {
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
JdbcExternalDatabase db = new JdbcExternalDatabase(this, dbId, dbName);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return Lists.newArrayList(dbNameToId.keySet());
protected List<String> listDatabaseNames() {
return jdbcClient.getDatabaseNameList();
}
@Override
@ -182,7 +148,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcExternalDatabase db = (JdbcExternalDatabase) idToDb.get(dbNameToId.get(dbName));
if (db != null && db.isInitialized()) {
List<String> names = Lists.newArrayList();
db.getTables().stream().forEach(table -> names.add(table.getName()));
db.getTables().forEach(table -> names.add(table.getName()));
return names;
} else {
return jdbcClient.getTablesNameList(dbName);

View File

@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MaxComputeExternalCatalog extends ExternalCatalog {
private Odps odps;
private String tunnelUrl;
private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun.com";
public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE);
this.type = "max_compute";
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected void initLocalObjectsImpl() {
Map<String, String> props = catalogProperty.getProperties();
String region = props.get(MCProperties.REGION);
String defaultProject = props.get(MCProperties.PROJECT);
if (Strings.isNullOrEmpty(region)) {
throw new IllegalArgumentException("Missing required property '" + MCProperties.REGION + "'.");
}
if (Strings.isNullOrEmpty(defaultProject)) {
throw new IllegalArgumentException("Missing required property '" + MCProperties.PROJECT + "'.");
}
if (region.startsWith("oss-")) {
// may use oss-cn-beijing, ensure compatible
region = region.replace("oss-", "");
}
this.tunnelUrl = tunnelUrlTemplate.replace("{}", region);
CloudCredential credential = MCProperties.getCredential(props);
Account account = new AliyunAccount(credential.getAccessKey(), credential.getSecretKey());
this.odps = new Odps(account);
odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
odps.setDefaultProject(defaultProject);
}
public Odps getClient() {
makeSureInitialized();
return odps;
}
protected List<String> listDatabaseNames() {
List<String> result = new ArrayList<>();
try {
result.add(odps.projects().get(odps.getDefaultProject()).getName());
} catch (OdpsException e) {
throw new RuntimeException(e);
}
return result;
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
try {
return odps.tables().exists(tblName);
} catch (OdpsException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
List<String> result = new ArrayList<>();
odps.tables().forEach(e -> result.add(e.getName()));
return result;
}
/**
* data tunnel url
* @return tunnelUrl, required by jni scanner.
*/
public String getTunnelUrl() {
makeSureInitialized();
return tunnelUrl;
}
}

View File

@ -17,9 +17,6 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.Util;
@ -27,7 +24,6 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.catalog.Catalog;
@ -37,7 +33,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -55,47 +50,13 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected SupportsNamespaces nsCatalog;
public IcebergExternalCatalog(long catalogId, String name) {
super(catalogId, name);
super(catalogId, name, InitCatalogLog.Type.ICEBERG);
}
@Override
protected void init() {
nsCatalog = (SupportsNamespaces) catalog;
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
List<String> allDatabaseNames = listDatabaseNames();
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
for (String dbName : allDatabaseNames) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
continue;
}
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
IcebergExternalDatabase db = new IcebergExternalDatabase(this, dbId, dbName);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
super.init();
}
protected Configuration getConfiguration() {
@ -137,12 +98,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
.collect(Collectors.toList());
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return new ArrayList<>(dbNameToId.keySet());
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();

View File

@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Map;
/**
* properties for aliyun max compute
*/
public class MCProperties extends BaseProperties {
public static final String REGION = "mc.region";
public static final String PROJECT = "mc.default.project";
public static final String ACCESS_KEY = "mc.access_key";
public static final String SECRET_KEY = "mc.secret_key";
public static final String SESSION_TOKEN = "mc.session_token";
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -18,8 +18,6 @@
package org.apache.doris.datasource.test;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
@ -27,7 +25,6 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -45,18 +42,14 @@ public class TestExternalCatalog extends ExternalCatalog {
private TestCatalogProvider catalogProvider;
public TestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
super(catalogId, name, InitCatalogLog.Type.TEST);
this.type = "test";
this.catalogProperty = new CatalogProperty(resource, props);
Class<?> providerClazz = null;
try {
providerClazz = Class.forName(props.get("catalog_provider.class"));
this.catalogProvider = (TestCatalogProvider) providerClazz.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@ -65,38 +58,7 @@ public class TestExternalCatalog extends ExternalCatalog {
protected void initLocalObjectsImpl() {
}
@Override
protected void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.TEST);
List<String> allDatabaseNames = mockedDatabaseNames();
for (String dbName : allDatabaseNames) {
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(dbName, dbId);
TestExternalDatabase db = new TestExternalDatabase(this, dbId, dbName);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, dbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
private List<String> mockedDatabaseNames() {
protected List<String> listDatabaseNames() {
return Lists.newArrayList(catalogProvider.getMetadata().keySet());
}
@ -117,12 +79,6 @@ public class TestExternalCatalog extends ExternalCatalog {
return catalogProvider.getMetadata().get(dbName).get(tblName);
}
@Override
public List<String> listDatabaseNames(SessionContext ctx) {
makeSureInitialized();
return Lists.newArrayList(dbNameToId.keySet());
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();

View File

@ -47,11 +47,14 @@ import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
@ -183,7 +186,8 @@ public class GsonUtils {
.registerSubtype(IcebergHMSExternalCatalog.class, IcebergHMSExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName());
.registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
@ -191,14 +195,16 @@ public class GsonUtils {
.registerSubtype(EsExternalDatabase.class, EsExternalDatabase.class.getSimpleName())
.registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
.registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
.registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName());
.registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
.registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
TableIf.class, "clazz").registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
.registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName())
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
.registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName());
.registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
.registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
// runtime adapter for class "HeartbeatResponse"
private static RuntimeTypeAdapterFactory<HeartbeatResponse> hbResponseTypeAdapterFactory

View File

@ -71,9 +71,11 @@ import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.HudiScanNode;
import org.apache.doris.planner.external.MaxComputeScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TNullSide;
import org.apache.doris.thrift.TPushAggOp;
@ -2020,6 +2022,11 @@ public class SingleNodePlanner {
case ICEBERG_EXTERNAL_TABLE:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
StatisticalType.MAX_COMPUTE_SCAN_NODE, true);
break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true);
break;

View File

@ -325,13 +325,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
return rangeDesc;
}
protected TFileType getLocationType() throws UserException {
throw new NotImplementedException("");
}
protected abstract TFileType getLocationType() throws UserException;
protected TFileFormatType getFileFormatType() throws UserException {
throw new NotImplementedException("");
}
protected abstract TFileFormatType getFileFormatType() throws UserException;
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
@ -341,17 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
throw new NotImplementedException("");
}
protected List<String> getPathPartitionKeys() throws UserException {
throw new NotImplementedException("");
}
protected abstract List<String> getPathPartitionKeys() throws UserException;
protected TableIf getTargetTable() throws UserException {
throw new NotImplementedException("");
}
protected abstract TableIf getTargetTable() throws UserException;
protected Map<String, String> getLocationProperties() throws UserException {
throw new NotImplementedException("");
}
protected abstract Map<String, String> getLocationProperties() throws UserException;
// eg: hdfs://namenode s3://buckets
protected String getFsName(FileSplit split) {

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MaxComputeScanNode extends FileQueryScanNode {
private final MaxComputeExternalTable table;
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
table = (MaxComputeExternalTable) desc.getTable();
}
@Override
protected TFileType getLocationType() throws UserException {
return TFileType.FILE_STREAM;
}
@Override
public TFileFormatType getFileFormatType() {
// TODO: use max compute format
return TFileFormatType.FORMAT_PARQUET;
}
@Override
public List<String> getPathPartitionKeys() {
return Collections.emptyList();
}
@Override
protected TableIf getTargetTable() throws UserException {
return table;
}
@Override
protected Map<String, String> getLocationProperties() throws UserException {
return new HashMap<>();
}
@Override
protected List<Split> getSplits() throws UserException {
List<Split> result = new ArrayList<>();
result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], Collections.emptyList()));
return result;
}
}

View File

@ -48,6 +48,7 @@ public enum StatisticalType {
UNION_NODE,
TABLE_VALUED_FUNCTION_NODE,
FILE_SCAN_NODE,
MAX_COMPUTE_SCAN_NODE,
METADATA_SCAN_NODE,
JDBC_SCAN_NODE,
TEST_EXTERNAL_TABLE,