diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 779f3fab67..643ef1c7f4 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -407,6 +407,11 @@ under the License.
+
+ com.aliyun.odps
+ odps-sdk-core
+ 0.43.3-public
+
org.springframework.boot
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 25c2319d5b..de3f4e917b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,7 @@ public interface TableIf {
enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
- ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE;
+ ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
index b69041952e..5d020789b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -17,35 +17,15 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* Elasticsearch metastore external database.
*/
public class EsExternalDatabase extends ExternalDatabase implements GsonPostProcessable {
- private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class);
-
- // Cache of table name to table id.
- private Map tableNameToId = Maps.newConcurrentMap();
- @SerializedName(value = "idToTbl")
- private Map idToTbl = Maps.newConcurrentMap();
/**
* Create Elasticsearch external database.
@@ -55,106 +35,12 @@ public class EsExternalDatabase extends ExternalDatabase implem
* @param name database name.
*/
public EsExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
- super(extCatalog, id, name);
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newConcurrentMap();
- for (int i = 0; i < log.getRefreshCount(); i++) {
- EsExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- for (int i = 0; i < log.getCreateCount(); i++) {
- EsExternalTable table = new EsExternalTable(log.getCreateTableIds().get(i),
- log.getCreateTableNames().get(i), name, (EsExternalCatalog) catalog);
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- initialized = true;
- }
-
- public void setTableExtCatalog(ExternalCatalog extCatalog) {
- for (EsExternalTable table : idToTbl.values()) {
- table.setCatalog(extCatalog);
- }
+ super(extCatalog, id, name, InitDatabaseLog.Type.ES);
}
@Override
- protected void init() {
- InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
- initDatabaseLog.setType(InitDatabaseLog.Type.ES);
- initDatabaseLog.setCatalogId(extCatalog.getId());
- initDatabaseLog.setDbId(id);
- List tableNames = extCatalog.listTableNames(null, name);
- if (tableNames != null) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newHashMap();
- for (String tableName : tableNames) {
- long tblId;
- if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
- tblId = tableNameToId.get(tableName);
- tmpTableNameToId.put(tableName, tblId);
- EsExternalTable table = idToTbl.get(tblId);
- table.unsetObjectCreated();
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addRefreshTable(tblId);
- } else {
- tblId = Env.getCurrentEnv().getNextId();
- tmpTableNameToId.put(tableName, tblId);
- EsExternalTable table = new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog);
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addCreateTable(tblId, tableName);
- }
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- }
- initialized = true;
- Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
- }
-
- @Override
- public Set getTableNamesWithLock() {
- makeSureInitialized();
- return Sets.newHashSet(tableNameToId.keySet());
- }
-
- @Override
- public List getTables() {
- makeSureInitialized();
- return Lists.newArrayList(idToTbl.values());
- }
-
- @Override
- public EsExternalTable getTableNullable(String tableName) {
- makeSureInitialized();
- if (!tableNameToId.containsKey(tableName)) {
- return null;
- }
- return idToTbl.get(tableNameToId.get(tableName));
- }
-
- @Override
- public EsExternalTable getTableNullable(long tableId) {
- makeSureInitialized();
- return idToTbl.get(tableId);
- }
-
- public EsExternalTable getTableForReplay(long tableId) {
- return idToTbl.get(tableId);
- }
-
- @Override
- public void gsonPostProcess() throws IOException {
- tableNameToId = Maps.newConcurrentMap();
- for (EsExternalTable tbl : idToTbl.values()) {
- tableNameToId.put(tbl.getName(), tbl.getId());
- }
- rwLock = new ReentrantReadWriteLock(true);
+ protected EsExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog);
}
public void addTableForTest(EsExternalTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
index 0b2b2cd029..736b574500 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -26,9 +26,6 @@ import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.List;
/**
@@ -36,7 +33,6 @@ import java.util.List;
*/
public class EsExternalTable extends ExternalTable {
- private static final Logger LOG = LogManager.getLogger(EsExternalTable.class);
private EsTable esTable;
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index fec46d034c..bc16a766c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -32,6 +32,9 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterCatalogExecutor;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
@@ -41,6 +44,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -50,8 +54,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* @param External table type is ExternalTable or its subclass.
*/
-public class ExternalDatabase implements DatabaseIf, Writable, GsonPostProcessable {
-
+public abstract class ExternalDatabase
+ implements DatabaseIf, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class);
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
@@ -64,16 +68,14 @@ public class ExternalDatabase implements DatabaseIf,
protected DatabaseProperty dbProperties = new DatabaseProperty();
@SerializedName(value = "initialized")
protected boolean initialized = false;
+ // Cache of table name to table id.
+ protected Map tableNameToId = Maps.newConcurrentMap();
+ @SerializedName(value = "idToTbl")
+ protected Map idToTbl = Maps.newConcurrentMap();
+ protected final InitDatabaseLog.Type dbLogType;
protected ExternalCatalog extCatalog;
protected boolean invalidCacheInInit = true;
- /**
- * No args constructor for persist.
- */
- public ExternalDatabase() {
- initialized = false;
- }
-
/**
* Create external database.
*
@@ -81,10 +83,11 @@ public class ExternalDatabase implements DatabaseIf,
* @param id Database id.
* @param name Database name.
*/
- public ExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
+ public ExternalDatabase(ExternalCatalog extCatalog, long id, String name, InitDatabaseLog.Type dbLogType) {
this.extCatalog = extCatalog;
this.id = id;
this.name = name;
+ this.dbLogType = dbLogType;
}
public void setExtCatalog(ExternalCatalog extCatalog) {
@@ -92,6 +95,9 @@ public class ExternalDatabase implements DatabaseIf,
}
public void setTableExtCatalog(ExternalCatalog extCatalog) {
+ for (T table : idToTbl.values()) {
+ table.setCatalog(extCatalog);
+ }
}
public void setUnInitialized(boolean invalidCache) {
@@ -125,16 +131,61 @@ public class ExternalDatabase implements DatabaseIf,
}
}
- protected void init() {
- throw new NotImplementedException("init() is not implemented");
+ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+ Map tmpTableNameToId = Maps.newConcurrentMap();
+ Map tmpIdToTbl = Maps.newConcurrentMap();
+ for (int i = 0; i < log.getRefreshCount(); i++) {
+ T table = getTableForReplay(log.getRefreshTableIds().get(i));
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog);
+ tmpTableNameToId.put(table.getName(), table.getId());
+ tmpIdToTbl.put(table.getId(), table);
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ initialized = true;
}
+ protected void init() {
+ InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+ initDatabaseLog.setType(dbLogType);
+ initDatabaseLog.setCatalogId(extCatalog.getId());
+ initDatabaseLog.setDbId(id);
+ List tableNames = extCatalog.listTableNames(null, name);
+ if (tableNames != null) {
+ Map tmpTableNameToId = Maps.newConcurrentMap();
+ Map tmpIdToTbl = Maps.newHashMap();
+ for (String tableName : tableNames) {
+ long tblId;
+ if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
+ tblId = tableNameToId.get(tableName);
+ tmpTableNameToId.put(tableName, tblId);
+ T table = idToTbl.get(tblId);
+ table.unsetObjectCreated();
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addRefreshTable(tblId);
+ } else {
+ tblId = Env.getCurrentEnv().getNextId();
+ tmpTableNameToId.put(tableName, tblId);
+ T table = getExternalTable(tableName, tblId, extCatalog);
+ tmpIdToTbl.put(tblId, table);
+ initDatabaseLog.addCreateTable(tblId, tableName);
+ }
+ }
+ tableNameToId = tmpTableNameToId;
+ idToTbl = tmpIdToTbl;
+ }
+ initialized = true;
+ Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
+ }
+
+ protected abstract T getExternalTable(String tableName, long tblId, ExternalCatalog catalog);
+
public T getTableForReplay(long tableId) {
- throw new NotImplementedException("getTableForReplay() is not implemented");
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- throw new NotImplementedException("replayInitDb() is not implemented");
+ return idToTbl.get(tableId);
}
@Override
@@ -210,7 +261,8 @@ public class ExternalDatabase implements DatabaseIf,
@Override
public List getTables() {
- throw new NotImplementedException("getTables() is not implemented");
+ makeSureInitialized();
+ return Lists.newArrayList(idToTbl.values());
}
@Override
@@ -235,17 +287,23 @@ public class ExternalDatabase implements DatabaseIf,
@Override
public Set getTableNamesWithLock() {
- throw new NotImplementedException("getTableNamesWithLock() is not implemented");
+ makeSureInitialized();
+ return Sets.newHashSet(tableNameToId.keySet());
}
@Override
public T getTableNullable(String tableName) {
- throw new NotImplementedException("getTableNullable() is not implemented");
+ makeSureInitialized();
+ if (!tableNameToId.containsKey(tableName)) {
+ return null;
+ }
+ return idToTbl.get(tableNameToId.get(tableName));
}
@Override
public T getTableNullable(long tableId) {
- throw new NotImplementedException("getTableNullable() is not implemented");
+ makeSureInitialized();
+ return idToTbl.get(tableId);
}
@Override
@@ -253,13 +311,20 @@ public class ExternalDatabase implements DatabaseIf,
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
+ @SuppressWarnings("rawtypes")
public static ExternalDatabase read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ExternalDatabase.class);
}
@Override
- public void gsonPostProcess() throws IOException {}
+ public void gsonPostProcess() throws IOException {
+ tableNameToId = Maps.newConcurrentMap();
+ for (T tbl : idToTbl.values()) {
+ tableNameToId.put(tbl.getName(), tbl.getId());
+ }
+ rwLock = new ReentrantReadWriteLock(true);
+ }
@Override
public void dropTable(String tableName) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 2a72f4cf6e..7a8ff075e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -67,7 +67,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@SerializedName(value = "dbName")
protected String dbName;
- protected boolean objectCreated = false;
+ protected boolean objectCreated;
protected ExternalCatalog catalog;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index 84cf3c4af7..01c62ee1ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -17,26 +17,17 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@@ -45,11 +36,6 @@ import java.util.stream.Collectors;
public class HMSExternalDatabase extends ExternalDatabase implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(HMSExternalDatabase.class);
- // Cache of table name to table id.
- private Map tableNameToId = Maps.newConcurrentMap();
- @SerializedName(value = "idToTbl")
- private Map idToTbl = Maps.newConcurrentMap();
-
/**
* Create HMS external database.
*
@@ -58,73 +44,12 @@ public class HMSExternalDatabase extends ExternalDatabase impl
* @param name database name.
*/
public HMSExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
- super(extCatalog, id, name);
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newConcurrentMap();
- for (int i = 0; i < log.getRefreshCount(); i++) {
- HMSExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- for (int i = 0; i < log.getCreateCount(); i++) {
- HMSExternalTable table = new HMSExternalTable(log.getCreateTableIds().get(i),
- log.getCreateTableNames().get(i), name, (HMSExternalCatalog) catalog);
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- initialized = true;
- }
-
- public void setTableExtCatalog(ExternalCatalog extCatalog) {
- for (HMSExternalTable table : idToTbl.values()) {
- table.setCatalog(extCatalog);
- }
+ super(extCatalog, id, name, InitDatabaseLog.Type.HMS);
}
@Override
- protected void init() {
- InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
- initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
- initDatabaseLog.setCatalogId(extCatalog.getId());
- initDatabaseLog.setDbId(id);
- List tableNames = extCatalog.listTableNames(null, name);
- if (tableNames != null) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newHashMap();
- for (String tableName : tableNames) {
- long tblId;
- if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
- tblId = tableNameToId.get(tableName);
- tmpTableNameToId.put(tableName, tblId);
- HMSExternalTable table = idToTbl.get(tblId);
- table.unsetObjectCreated();
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addRefreshTable(tblId);
- } else {
- tblId = Env.getCurrentEnv().getNextId();
- tmpTableNameToId.put(tableName, tblId);
- HMSExternalTable table = new HMSExternalTable(tblId, tableName, name,
- (HMSExternalCatalog) extCatalog);
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addCreateTable(tblId, tableName);
- }
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- }
- initialized = true;
- Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
- }
-
- @Override
- public List getTables() {
- makeSureInitialized();
- return Lists.newArrayList(idToTbl.values());
+ protected HMSExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog);
}
@Override
@@ -133,40 +58,6 @@ public class HMSExternalDatabase extends ExternalDatabase impl
return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
}
- @Override
- public Set getTableNamesWithLock() {
- makeSureInitialized();
- return Sets.newHashSet(tableNameToId.keySet());
- }
-
- @Override
- public HMSExternalTable getTableNullable(String tableName) {
- makeSureInitialized();
- if (!tableNameToId.containsKey(tableName)) {
- return null;
- }
- return idToTbl.get(tableNameToId.get(tableName));
- }
-
- @Override
- public HMSExternalTable getTableNullable(long tableId) {
- makeSureInitialized();
- return idToTbl.get(tableId);
- }
-
- public HMSExternalTable getTableForReplay(long tableId) {
- return idToTbl.get(tableId);
- }
-
- @Override
- public void gsonPostProcess() throws IOException {
- tableNameToId = Maps.newConcurrentMap();
- for (HMSExternalTable tbl : idToTbl.values()) {
- tableNameToId.put(tbl.getName(), tbl.getId());
- }
- rwLock = new ReentrantReadWriteLock(true);
- }
-
public void addTableForTest(HMSExternalTable tbl) {
idToTbl.put(tbl.getId(), tbl);
tableNameToId.put(tbl.getName(), tbl.getId());
@@ -188,8 +79,7 @@ public class HMSExternalDatabase extends ExternalDatabase impl
LOG.debug("create table [{}]", tableName);
makeSureInitialized();
tableNameToId.put(tableName, tableId);
- HMSExternalTable table = new HMSExternalTable(tableId, tableName, name,
- (HMSExternalCatalog) extCatalog);
+ HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog);
idToTbl.put(tableId, table);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
index db1d1e13ce..96175487e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
@@ -17,104 +17,30 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class IcebergExternalDatabase extends ExternalDatabase implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(IcebergExternalDatabase.class);
- // Cache of table name to table id.
- private Map tableNameToId = Maps.newConcurrentMap();
- @SerializedName(value = "idToTbl")
- private Map idToTbl = Maps.newConcurrentMap();
public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String name) {
- super(extCatalog, id, name);
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newConcurrentMap();
- for (int i = 0; i < log.getRefreshCount(); i++) {
- IcebergExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- for (int i = 0; i < log.getCreateCount(); i++) {
- IcebergExternalTable table = new IcebergExternalTable(log.getCreateTableIds().get(i),
- log.getCreateTableNames().get(i), name, (IcebergExternalCatalog) catalog);
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- initialized = true;
- }
-
- public void setTableExtCatalog(ExternalCatalog extCatalog) {
- for (IcebergExternalTable table : idToTbl.values()) {
- table.setCatalog(extCatalog);
- }
+ super(extCatalog, id, name, InitDatabaseLog.Type.ICEBERG);
}
@Override
- protected void init() {
- InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
- initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
- initDatabaseLog.setCatalogId(extCatalog.getId());
- initDatabaseLog.setDbId(id);
- List tableNames = extCatalog.listTableNames(null, name);
- if (tableNames != null) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newHashMap();
- for (String tableName : tableNames) {
- long tblId;
- if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
- tblId = tableNameToId.get(tableName);
- tmpTableNameToId.put(tableName, tblId);
- IcebergExternalTable table = idToTbl.get(tblId);
- table.unsetObjectCreated();
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addRefreshTable(tblId);
- } else {
- tblId = Env.getCurrentEnv().getNextId();
- tmpTableNameToId.put(tableName, tblId);
- IcebergExternalTable table = new IcebergExternalTable(tblId, tableName, name,
- (IcebergExternalCatalog) extCatalog);
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addCreateTable(tblId, tableName);
- }
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- }
- initialized = true;
- Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
- }
-
- @Override
- public List getTables() {
- makeSureInitialized();
- return Lists.newArrayList(idToTbl.values());
+ protected IcebergExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new IcebergExternalTable(tblId, tableName, name, (IcebergExternalCatalog) extCatalog);
}
@Override
@@ -123,41 +49,6 @@ public class IcebergExternalDatabase extends ExternalDatabase getTableNamesWithLock() {
- makeSureInitialized();
- return Sets.newHashSet(tableNameToId.keySet());
- }
-
- @Override
- public IcebergExternalTable getTableNullable(String tableName) {
- makeSureInitialized();
- if (!tableNameToId.containsKey(tableName)) {
- return null;
- }
- return idToTbl.get(tableNameToId.get(tableName));
- }
-
- @Override
- public IcebergExternalTable getTableNullable(long tableId) {
- makeSureInitialized();
- return idToTbl.get(tableId);
- }
-
- @Override
- public IcebergExternalTable getTableForReplay(long tableId) {
- return idToTbl.get(tableId);
- }
-
- @Override
- public void gsonPostProcess() throws IOException {
- tableNameToId = Maps.newConcurrentMap();
- for (IcebergExternalTable tbl : idToTbl.values()) {
- tableNameToId.put(tbl.getName(), tbl.getId());
- }
- rwLock = new ReentrantReadWriteLock(true);
- }
-
@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
index f04a389570..e60ca3dfe4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java
@@ -17,32 +17,12 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class JdbcExternalDatabase extends ExternalDatabase implements GsonPostProcessable {
- private static final Logger LOG = LogManager.getLogger(JdbcExternalDatabase.class);
-
- // Cache of table name to table id.
- private Map tableNameToId = Maps.newConcurrentMap();
- @SerializedName(value = "idToTbl")
- private Map idToTbl = Maps.newConcurrentMap();
/**
* Create Jdbc external database.
@@ -52,107 +32,12 @@ public class JdbcExternalDatabase extends ExternalDatabase im
* @param name database name.
*/
public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
- super(extCatalog, id, name);
+ super(extCatalog, id, name, InitDatabaseLog.Type.JDBC);
}
@Override
- protected void init() {
- InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
- initDatabaseLog.setType(InitDatabaseLog.Type.JDBC);
- initDatabaseLog.setCatalogId(extCatalog.getId());
- initDatabaseLog.setDbId(id);
- List tableNames = extCatalog.listTableNames(null, name);
- if (tableNames != null) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newHashMap();
- for (String tableName : tableNames) {
- long tblId;
- if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
- tblId = tableNameToId.get(tableName);
- tmpTableNameToId.put(tableName, tblId);
- JdbcExternalTable table = idToTbl.get(tblId);
- table.unsetObjectCreated();
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addRefreshTable(tblId);
- } else {
- tblId = Env.getCurrentEnv().getNextId();
- tmpTableNameToId.put(tableName, tblId);
- JdbcExternalTable table = new JdbcExternalTable(tblId, tableName, name,
- (JdbcExternalCatalog) extCatalog);
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addCreateTable(tblId, tableName);
- }
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- }
- initialized = true;
- Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
- }
-
- public void setTableExtCatalog(ExternalCatalog extCatalog) {
- for (JdbcExternalTable table : idToTbl.values()) {
- table.setCatalog(extCatalog);
- }
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newConcurrentMap();
- for (int i = 0; i < log.getRefreshCount(); i++) {
- JdbcExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- for (int i = 0; i < log.getCreateCount(); i++) {
- JdbcExternalTable table = new JdbcExternalTable(log.getCreateTableIds().get(i),
- log.getCreateTableNames().get(i), name, (JdbcExternalCatalog) catalog);
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- initialized = true;
- }
-
- @Override
- public Set getTableNamesWithLock() {
- makeSureInitialized();
- return Sets.newHashSet(tableNameToId.keySet());
- }
-
- @Override
- public List getTables() {
- makeSureInitialized();
- return Lists.newArrayList(idToTbl.values());
- }
-
- @Override
- public JdbcExternalTable getTableNullable(String tableName) {
- makeSureInitialized();
- if (!tableNameToId.containsKey(tableName)) {
- return null;
- }
- return idToTbl.get(tableNameToId.get(tableName));
- }
-
- @Override
- public JdbcExternalTable getTableNullable(long tableId) {
- makeSureInitialized();
- return idToTbl.get(tableId);
- }
-
- public JdbcExternalTable getTableForReplay(long tableId) {
- return idToTbl.get(tableId);
- }
-
- @Override
- public void gsonPostProcess() throws IOException {
- tableNameToId = Maps.newConcurrentMap();
- for (JdbcExternalTable tbl : idToTbl.values()) {
- tableNameToId.put(tbl.getName(), tbl.getId());
- }
- rwLock = new ReentrantReadWriteLock(true);
+ protected JdbcExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog);
}
public void addTableForTest(JdbcExternalTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java
new file mode 100644
index 0000000000..56e892eab1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+/**
+ * MaxCompute external database.
+ */
+public class MaxComputeExternalDatabase extends ExternalDatabase
+ implements GsonPostProcessable {
+ /**
+ * Create MaxCompute external database.
+ *
+ * @param extCatalog External catalog this database belongs to.
+ * @param id database id.
+ * @param name database name.
+ */
+ public MaxComputeExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
+ super(extCatalog, id, name, InitDatabaseLog.Type.MAX_COMPUTE);
+ }
+
+ @Override
+ protected MaxComputeExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new MaxComputeExternalTable(tblId, tableName, name, (MaxComputeExternalCatalog) extCatalog);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
new file mode 100644
index 0000000000..5de781a686
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
+import org.apache.doris.thrift.TMCTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * MaxCompute external table.
+ */
+public class MaxComputeExternalTable extends ExternalTable {
+
+ private Table odpsTable;
+
+ public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) {
+ super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
+ }
+
+ @Override
+ protected synchronized void makeSureInitialized() {
+ super.makeSureInitialized();
+ if (!objectCreated) {
+ odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
+ objectCreated = true;
+ }
+ }
+
+ @Override
+ public List initSchema() {
+ makeSureInitialized();
+ List columns = odpsTable.getSchema().getColumns();
+ List result = Lists.newArrayListWithCapacity(columns.size());
+ for (com.aliyun.odps.Column field : columns) {
+ result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
+ true, field.getComment(), true, -1));
+ }
+ return result;
+ }
+
+ private Type mcTypeToDorisType(TypeInfo typeInfo) {
+ OdpsType odpsType = typeInfo.getOdpsType();
+ switch (odpsType) {
+ case VOID: {
+ return Type.NULL;
+ }
+ case BOOLEAN: {
+ return Type.BOOLEAN;
+ }
+ case TINYINT: {
+ return Type.TINYINT;
+ }
+ case SMALLINT: {
+ return Type.SMALLINT;
+ }
+ case INT: {
+ return Type.INT;
+ }
+ case BIGINT: {
+ return Type.BIGINT;
+ }
+ case CHAR: {
+ return Type.CHAR;
+ }
+ case VARCHAR: {
+ return Type.VARCHAR;
+ }
+ case STRING: {
+ return Type.STRING;
+ }
+ case JSON: {
+ return Type.UNSUPPORTED;
+ // return Type.JSONB;
+ }
+ case FLOAT: {
+ return Type.FLOAT;
+ }
+ case DOUBLE: {
+ return Type.DOUBLE;
+ }
+ case DECIMAL: {
+ DecimalTypeInfo decimal = (DecimalTypeInfo) typeInfo;
+ return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
+ }
+ case DATE: {
+ return ScalarType.createDateV2Type();
+ }
+ case DATETIME:
+ case TIMESTAMP: {
+ return ScalarType.createDatetimeV2Type(3);
+ }
+ case ARRAY: {
+ ArrayTypeInfo arrayType = (ArrayTypeInfo) typeInfo;
+ Type innerType = mcTypeToDorisType(arrayType.getElementTypeInfo());
+ return ArrayType.create(innerType, true);
+ }
+ case MAP: {
+ MapTypeInfo mapType = (MapTypeInfo) typeInfo;
+ return new MapType(mcTypeToDorisType(mapType.getKeyTypeInfo()),
+ mcTypeToDorisType(mapType.getValueTypeInfo()));
+ }
+ case STRUCT: {
+ ArrayList fields = new ArrayList<>();
+ StructTypeInfo structType = (StructTypeInfo) typeInfo;
+ List fieldNames = structType.getFieldNames();
+ List fieldTypeInfos = structType.getFieldTypeInfos();
+ for (int i = 0; i < structType.getFieldCount(); i++) {
+ Type innerType = mcTypeToDorisType(fieldTypeInfos.get(i));
+ fields.add(new StructField(fieldNames.get(i), innerType));
+ }
+ return new StructType(fields);
+ }
+ case BINARY:
+ case INTERVAL_DAY_TIME:
+ case INTERVAL_YEAR_MONTH:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown type: " + odpsType);
+ }
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ List schema = getFullSchema();
+ TMCTable tMcTable = new TMCTable();
+ tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) catalog).getTunnelUrl());
+ // use mc project as dbName
+ tMcTable.setProject(dbName);
+ tMcTable.setTable(name);
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE,
+ schema.size(), 0, getName(), dbName);
+ tTableDescriptor.setMcTable(tMcTable);
+ return tTableDescriptor;
+ }
+
+ @Override
+ public String getMysqlType() {
+ return "BASE TABLE";
+ }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java
index fe1852241d..fbac107f45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java
@@ -17,134 +17,19 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.test.TestExternalCatalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class TestExternalDatabase extends ExternalDatabase implements GsonPostProcessable {
- private static final Logger LOG = LogManager.getLogger(TestExternalDatabase.class);
-
- // Cache of table name to table id.
- private Map tableNameToId = Maps.newConcurrentMap();
- @SerializedName(value = "idToTbl")
- private Map idToTbl = Maps.newConcurrentMap();
public TestExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
- super(extCatalog, id, name);
+ super(extCatalog, id, name, InitDatabaseLog.Type.TEST);
}
@Override
- protected void init() {
- InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
- initDatabaseLog.setType(InitDatabaseLog.Type.TEST);
- initDatabaseLog.setCatalogId(extCatalog.getId());
- initDatabaseLog.setDbId(id);
- List tableNames = extCatalog.listTableNames(null, name);
- if (tableNames != null) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newHashMap();
- for (String tableName : tableNames) {
- long tblId;
- if (tableNameToId != null && tableNameToId.containsKey(tableName)) {
- tblId = tableNameToId.get(tableName);
- tmpTableNameToId.put(tableName, tblId);
- TestExternalTable table = idToTbl.get(tblId);
- table.unsetObjectCreated();
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addRefreshTable(tblId);
- } else {
- tblId = Env.getCurrentEnv().getNextId();
- tmpTableNameToId.put(tableName, tblId);
- TestExternalTable table = new TestExternalTable(tblId, tableName, name,
- (TestExternalCatalog) extCatalog);
- tmpIdToTbl.put(tblId, table);
- initDatabaseLog.addCreateTable(tblId, tableName);
- }
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- }
- initialized = true;
- Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
- }
-
- public void setTableExtCatalog(ExternalCatalog extCatalog) {
- for (TestExternalTable table : idToTbl.values()) {
- table.setCatalog(extCatalog);
- }
- }
-
- public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
- Map tmpTableNameToId = Maps.newConcurrentMap();
- Map tmpIdToTbl = Maps.newConcurrentMap();
- for (int i = 0; i < log.getRefreshCount(); i++) {
- TestExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i));
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- for (int i = 0; i < log.getCreateCount(); i++) {
- TestExternalTable table = new TestExternalTable(log.getCreateTableIds().get(i),
- log.getCreateTableNames().get(i), name, (TestExternalCatalog) catalog);
- tmpTableNameToId.put(table.getName(), table.getId());
- tmpIdToTbl.put(table.getId(), table);
- }
- tableNameToId = tmpTableNameToId;
- idToTbl = tmpIdToTbl;
- initialized = true;
- }
-
- @Override
- public Set getTableNamesWithLock() {
- makeSureInitialized();
- return Sets.newHashSet(tableNameToId.keySet());
- }
-
- @Override
- public List getTables() {
- makeSureInitialized();
- return Lists.newArrayList(idToTbl.values());
- }
-
- @Override
- public TestExternalTable getTableNullable(String tableName) {
- makeSureInitialized();
- if (!tableNameToId.containsKey(tableName)) {
- return null;
- }
- return idToTbl.get(tableNameToId.get(tableName));
- }
-
- @Override
- public TestExternalTable getTableNullable(long tableId) {
- makeSureInitialized();
- return idToTbl.get(tableId);
- }
-
- public TestExternalTable getTableForReplay(long tableId) {
- return idToTbl.get(tableId);
- }
-
- @Override
- public void gsonPostProcess() throws IOException {
- tableNameToId = Maps.newConcurrentMap();
- for (TestExternalTable tbl : idToTbl.values()) {
- tableNameToId.put(tbl.getName(), tbl.getId());
- }
- rwLock = new ReentrantReadWriteLock(true);
+ protected TestExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new TestExternalTable(tblId, tableName, name, (TestExternalCatalog) extCatalog);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 2da0fc4dd1..4a52b9b110 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -113,6 +113,9 @@ public class CatalogFactory {
case "iceberg":
catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props);
break;
+ case "max_compute":
+ catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props);
+ break;
case "test":
if (!FeConstants.runningUnitTest) {
throw new DdlException("test catalog is only for FE unit test");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index 52f9bc984f..0b81973d31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -20,6 +20,8 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.external.EsExternalDatabase;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.external.elasticsearch.EsRestClient;
import com.google.common.collect.Lists;
@@ -29,7 +31,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -47,7 +48,7 @@ public class EsExternalCatalog extends ExternalCatalog {
* Default constructor for EsExternalCatalog.
*/
public EsExternalCatalog(long catalogId, String name, String resource, Map props) {
- super(catalogId, name);
+ super(catalogId, name, InitCatalogLog.Type.ES);
this.type = "es";
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
}
@@ -124,7 +125,7 @@ public class EsExternalCatalog extends ExternalCatalog {
protected void init() {
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
- initCatalogLog.setType(InitCatalogLog.Type.ES);
+ initCatalogLog.setType(logType);
if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) {
idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(invalidCacheInInit);
initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB));
@@ -133,19 +134,13 @@ public class EsExternalCatalog extends ExternalCatalog {
idToDb = Maps.newConcurrentMap();
long defaultDbId = Env.getCurrentEnv().getNextId();
dbNameToId.put(DEFAULT_DB, defaultDbId);
- EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId, DEFAULT_DB);
+ ExternalDatabase extends ExternalTable> db = getDbForInit(DEFAULT_DB, defaultDbId, logType);
idToDb.put(defaultDbId, db);
initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB);
}
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
- @Override
- public List listDatabaseNames(SessionContext ctx) {
- makeSureInitialized();
- return new ArrayList<>(dbNameToId.keySet());
- }
-
@Override
public List listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index adcb109ace..6198796688 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
+import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
+import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
@@ -51,6 +53,7 @@ import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -59,7 +62,8 @@ import java.util.Optional;
* The abstract class for all types of external catalogs.
*/
@Data
-public abstract class ExternalCatalog implements CatalogIf, Writable, GsonPostProcessable {
+public abstract class ExternalCatalog
+ implements CatalogIf>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
// Unique id of this catalog, will be assigned after catalog is loaded.
@@ -69,13 +73,15 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
protected String name;
@SerializedName(value = "type")
protected String type;
+ // TODO: replace type with log type
+ protected final InitCatalogLog.Type logType;
// save properties of this catalog, such as hive meta store url.
@SerializedName(value = "catalogProperty")
protected CatalogProperty catalogProperty;
@SerializedName(value = "initialized")
private boolean initialized = false;
@SerializedName(value = "idToDb")
- protected Map idToDb = Maps.newConcurrentMap();
+ protected Map> idToDb = Maps.newConcurrentMap();
// db name does not contains "default_cluster"
protected Map dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
@@ -84,15 +90,25 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
private ExternalSchemaCache schemaCache;
private String comment;
- public ExternalCatalog(long catalogId, String name) {
+ public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType) {
this.id = catalogId;
this.name = name;
+ this.logType = logType;
+ }
+
+ protected List listDatabaseNames() {
+ throw new UnsupportedOperationException("Unsupported operation: "
+ + "listDatabaseNames from remote client when init catalog with " + logType.name());
}
/**
* @return names of database in this catalog.
*/
- public abstract List listDatabaseNames(SessionContext ctx);
+ // public abstract List listDatabaseNames(SessionContext ctx);
+ public List listDatabaseNames(SessionContext ctx) {
+ makeSureInitialized();
+ return new ArrayList<>(dbNameToId.keySet());
+ }
/**
* @param dbName
@@ -204,7 +220,43 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
}
// init schema related objects
- protected abstract void init();
+ protected void init() {
+ Map tmpDbNameToId = Maps.newConcurrentMap();
+ Map> tmpIdToDb = Maps.newConcurrentMap();
+ InitCatalogLog initCatalogLog = new InitCatalogLog();
+ initCatalogLog.setCatalogId(id);
+ initCatalogLog.setType(logType);
+ List allDatabases = listDatabaseNames();
+ Map includeDatabaseMap = getIncludeDatabaseMap();
+ Map excludeDatabaseMap = getExcludeDatabaseMap();
+ for (String dbName : allDatabases) {
+ // Exclude database map take effect with higher priority over include database map
+ if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
+ continue;
+ }
+ if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
+ continue;
+ }
+ long dbId;
+ if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
+ dbId = dbNameToId.get(dbName);
+ tmpDbNameToId.put(dbName, dbId);
+ ExternalDatabase extends ExternalTable> db = idToDb.get(dbId);
+ db.setUnInitialized(invalidCacheInInit);
+ tmpIdToDb.put(dbId, db);
+ initCatalogLog.addRefreshDb(dbId);
+ } else {
+ dbId = Env.getCurrentEnv().getNextId();
+ tmpDbNameToId.put(dbName, dbId);
+ ExternalDatabase extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
+ tmpIdToDb.put(dbId, db);
+ initCatalogLog.addCreateDb(dbId, dbName);
+ }
+ }
+ dbNameToId = tmpDbNameToId;
+ idToDb = tmpIdToDb;
+ Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+ }
public void setUninitialized(boolean invalidCache) {
this.objectCreated = false;
@@ -219,17 +271,13 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
}
- public ExternalDatabase getDbForReplay(long dbId) {
- return idToDb.get(dbId);
- }
-
public final List getSchema(String dbName, String tblName) {
makeSureInitialized();
- Optional db = getDb(dbName);
+ Optional> db = getDb(dbName);
if (db.isPresent()) {
- Optional table = db.get().getTable(tblName);
+ Optional extends ExternalTable> table = db.get().getTable(tblName);
if (table.isPresent()) {
- return ((ExternalTable) table.get()).initSchema();
+ return table.get().initSchema();
}
}
// return one column with unsupported type.
@@ -287,7 +335,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
@Nullable
@Override
- public ExternalDatabase getDbNullable(String dbName) {
+ public ExternalDatabase extends ExternalTable> getDbNullable(String dbName) {
try {
makeSureInitialized();
} catch (Exception e) {
@@ -303,7 +351,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
@Nullable
@Override
- public ExternalDatabase getDbNullable(long dbId) {
+ public ExternalDatabase extends ExternalTable> getDbNullable(long dbId) {
try {
makeSureInitialized();
} catch (Exception e) {
@@ -358,54 +406,51 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
public void replayInitCatalog(InitCatalogLog log) {
Map tmpDbNameToId = Maps.newConcurrentMap();
- Map tmpIdToDb = Maps.newConcurrentMap();
+ Map> tmpIdToDb = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
- ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i));
+ ExternalDatabase extends ExternalTable> db = getDbForReplay(log.getRefreshDbIds().get(i));
db.setUnInitialized(invalidCacheInInit);
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
- switch (log.getType()) {
- case HMS:
- for (int i = 0; i < log.getCreateCount(); i++) {
- HMSExternalDatabase db = new HMSExternalDatabase(
- this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
- tmpDbNameToId.put(db.getFullName(), db.getId());
- tmpIdToDb.put(db.getId(), db);
- }
- break;
- case ES:
- for (int i = 0; i < log.getCreateCount(); i++) {
- EsExternalDatabase db = new EsExternalDatabase(
- this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
- tmpDbNameToId.put(db.getFullName(), db.getId());
- tmpIdToDb.put(db.getId(), db);
- }
- break;
- case JDBC:
- for (int i = 0; i < log.getCreateCount(); i++) {
- JdbcExternalDatabase db = new JdbcExternalDatabase(
- this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
- tmpDbNameToId.put(db.getFullName(), db.getId());
- tmpIdToDb.put(db.getId(), db);
- }
- break;
- case ICEBERG:
- for (int i = 0; i < log.getCreateCount(); i++) {
- IcebergExternalDatabase db = new IcebergExternalDatabase(
- this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i));
- tmpDbNameToId.put(db.getFullName(), db.getId());
- tmpIdToDb.put(db.getId(), db);
- }
- break;
- default:
- break;
+ for (int i = 0; i < log.getCreateCount(); i++) {
+ ExternalDatabase extends ExternalTable> db =
+ getDbForInit(log.getCreateDbNames().get(i), log.getCreateDbIds().get(i), log.getType());
+ if (db != null) {
+ tmpDbNameToId.put(db.getFullName(), db.getId());
+ tmpIdToDb.put(db.getId(), db);
+ }
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
initialized = true;
}
+ public ExternalDatabase extends ExternalTable> getDbForReplay(long dbId) {
+ return idToDb.get(dbId);
+ }
+
+ protected ExternalDatabase extends ExternalTable> getDbForInit(String dbName, long dbId,
+ InitCatalogLog.Type logType) {
+ switch (logType) {
+ case HMS:
+ return new HMSExternalDatabase(this, dbId, dbName);
+ case ES:
+ return new EsExternalDatabase(this, dbId, dbName);
+ case JDBC:
+ return new JdbcExternalDatabase(this, dbId, dbName);
+ case ICEBERG:
+ return new IcebergExternalDatabase(this, dbId, dbName);
+ case MAX_COMPUTE:
+ return new MaxComputeExternalDatabase(this, dbId, dbName);
+ case TEST:
+ return new TestExternalDatabase(this, dbId, dbName);
+ default:
+ break;
+ }
+ return null;
+ }
+
/**
* External catalog has no cluster semantics.
*/
@@ -425,7 +470,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
idToDb = Maps.newConcurrentMap();
}
dbNameToId = Maps.newConcurrentMap();
- for (ExternalDatabase db : idToDb.values()) {
+ for (ExternalDatabase extends ExternalTable> db : idToDb.values()) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
db.setExtCatalog(this);
db.setTableExtCatalog(this);
@@ -436,7 +481,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
}
}
- public void addDatabaseForTest(ExternalDatabase db) {
+ public void addDatabaseForTest(ExternalDatabase extends ExternalTable> db) {
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
@@ -449,15 +494,15 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
throw new NotImplementedException("createDatabase not implemented");
}
- public Map getIncludeDatabaseMap() {
+ public Map getIncludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.INCLUDE_DATABASE_LIST);
}
- public Map getExcludeDatabaseMap() {
+ public Map getExcludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.EXCLUDE_DATABASE_LIST);
}
- public Map getSpecifiedDatabaseMap(String catalogPropertyKey) {
+ public Map getSpecifiedDatabaseMap(String catalogPropertyKey) {
String specifiedDatabaseList = catalogProperty.getOrDefault(catalogPropertyKey, "");
Map specifiedDatabaseMap = Maps.newHashMap();
specifiedDatabaseList = specifiedDatabaseList.trim();
@@ -465,8 +510,8 @@ public abstract class ExternalCatalog implements CatalogIf, Wr
return specifiedDatabaseMap;
}
String[] databaseList = specifiedDatabaseList.split(",");
- for (int i = 0; i < databaseList.length; i++) {
- String dbname = databaseList[i].trim();
+ for (String database : databaseList) {
+ String dbname = database.trim();
if (!dbname.isEmpty()) {
specifiedDatabaseMap.put(dbname, true);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index f5b039a7cf..7846847c83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -31,7 +32,6 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,7 +70,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
* Default constructor for HMSExternalCatalog.
*/
public HMSExternalCatalog(long catalogId, String name, String resource, Map props) {
- super(catalogId, name);
+ super(catalogId, name, InitCatalogLog.Type.HMS);
this.type = "hms";
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
@@ -122,44 +122,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
}
- @Override
- protected void init() {
- Map tmpDbNameToId = Maps.newConcurrentMap();
- Map tmpIdToDb = Maps.newConcurrentMap();
- InitCatalogLog initCatalogLog = new InitCatalogLog();
- initCatalogLog.setCatalogId(id);
- initCatalogLog.setType(InitCatalogLog.Type.HMS);
- List allDatabases = client.getAllDatabases();
- Map includeDatabaseMap = getIncludeDatabaseMap();
- Map excludeDatabaseMap = getExcludeDatabaseMap();
- // Update the db name to id map.
- for (String dbName : allDatabases) {
- // Exclude database map take effect with higher priority over include database map
- if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
- continue;
- }
- if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
- continue;
- }
- long dbId;
- if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
- dbId = dbNameToId.get(dbName);
- tmpDbNameToId.put(dbName, dbId);
- ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized(invalidCacheInInit);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addRefreshDb(dbId);
- } else {
- dbId = Env.getCurrentEnv().getNextId();
- tmpDbNameToId.put(dbName, dbId);
- HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addCreateDb(dbId, dbName);
- }
- }
- dbNameToId = tmpDbNameToId;
- idToDb = tmpIdToDb;
- Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+ protected List listDatabaseNames() {
+ return client.getAllDatabases();
}
@Override
@@ -193,12 +157,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE);
}
- @Override
- public List listDatabaseNames(SessionContext ctx) {
- makeSureInitialized();
- return Lists.newArrayList(dbNameToId.keySet());
- }
-
@Override
public List listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
@@ -292,7 +250,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
makeSureInitialized();
LOG.debug("create database [{}]", dbName);
dbNameToId.put(dbName, dbId);
- HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
+ ExternalDatabase extends ExternalTable> db = getDbForInit(dbName, dbId, logType);
idToDb.put(dbId, db);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index b9d0ce185d..d20a15d837 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable {
ES,
JDBC,
ICEBERG,
+ MAX_COMPUTE,
TEST,
UNKNOWN;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index 9036353d30..9c3ad72b7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -34,8 +34,10 @@ import java.util.List;
public class InitDatabaseLog implements Writable {
public enum Type {
HMS,
+ ICEBERG,
ES,
JDBC,
+ MAX_COMPUTE,
TEST,
UNKNOWN;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
index 3ce9a1ad94..ec898af623 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
@@ -17,9 +17,7 @@
package org.apache.doris.datasource;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
-import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.common.DdlException;
import org.apache.doris.external.jdbc.JdbcClient;
@@ -51,7 +49,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
public JdbcExternalCatalog(long catalogId, String name, String resource, Map props)
throws DdlException {
- super(catalogId, name);
+ super(catalogId, name, InitCatalogLog.Type.JDBC);
this.type = "jdbc";
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
}
@@ -140,40 +138,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap());
}
- @Override
- protected void init() {
- Map tmpDbNameToId = Maps.newConcurrentMap();
- Map tmpIdToDb = Maps.newConcurrentMap();
- InitCatalogLog initCatalogLog = new InitCatalogLog();
- initCatalogLog.setCatalogId(id);
- initCatalogLog.setType(InitCatalogLog.Type.JDBC);
- List allDatabaseNames = jdbcClient.getDatabaseNameList();
- for (String dbName : allDatabaseNames) {
- long dbId;
- if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
- dbId = dbNameToId.get(dbName);
- tmpDbNameToId.put(dbName, dbId);
- ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized(invalidCacheInInit);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addRefreshDb(dbId);
- } else {
- dbId = Env.getCurrentEnv().getNextId();
- tmpDbNameToId.put(dbName, dbId);
- JdbcExternalDatabase db = new JdbcExternalDatabase(this, dbId, dbName);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addCreateDb(dbId, dbName);
- }
- }
- dbNameToId = tmpDbNameToId;
- idToDb = tmpIdToDb;
- Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
- }
-
- @Override
- public List listDatabaseNames(SessionContext ctx) {
- makeSureInitialized();
- return Lists.newArrayList(dbNameToId.keySet());
+ protected List listDatabaseNames() {
+ return jdbcClient.getDatabaseNameList();
}
@Override
@@ -182,7 +148,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcExternalDatabase db = (JdbcExternalDatabase) idToDb.get(dbNameToId.get(dbName));
if (db != null && db.isInitialized()) {
List names = Lists.newArrayList();
- db.getTables().stream().forEach(table -> names.add(table.getName()));
+ db.getTables().forEach(table -> names.add(table.getName()));
return names;
} else {
return jdbcClient.getTablesNameList(dbName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
new file mode 100644
index 0000000000..1b925df791
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.datasource.credentials.CloudCredential;
+import org.apache.doris.datasource.property.constants.MCProperties;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MaxComputeExternalCatalog extends ExternalCatalog {
+ private Odps odps;
+ private String tunnelUrl;
+ private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
+ private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun.com";
+
+ public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map props) {
+ super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE);
+ this.type = "max_compute";
+ catalogProperty = new CatalogProperty(resource, props);
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ Map props = catalogProperty.getProperties();
+ String region = props.get(MCProperties.REGION);
+ String defaultProject = props.get(MCProperties.PROJECT);
+ if (Strings.isNullOrEmpty(region)) {
+ throw new IllegalArgumentException("Missing required property '" + MCProperties.REGION + "'.");
+ }
+ if (Strings.isNullOrEmpty(defaultProject)) {
+ throw new IllegalArgumentException("Missing required property '" + MCProperties.PROJECT + "'.");
+ }
+ if (region.startsWith("oss-")) {
+ // may use oss-cn-beijing, ensure compatible
+ region = region.replace("oss-", "");
+ }
+ this.tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+ CloudCredential credential = MCProperties.getCredential(props);
+ Account account = new AliyunAccount(credential.getAccessKey(), credential.getSecretKey());
+ this.odps = new Odps(account);
+ odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
+ odps.setDefaultProject(defaultProject);
+ }
+
+ public Odps getClient() {
+ makeSureInitialized();
+ return odps;
+ }
+
+ protected List listDatabaseNames() {
+ List result = new ArrayList<>();
+ try {
+ result.add(odps.projects().get(odps.getDefaultProject()).getName());
+ } catch (OdpsException e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+ makeSureInitialized();
+ try {
+ return odps.tables().exists(tblName);
+ } catch (OdpsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List listTableNames(SessionContext ctx, String dbName) {
+ makeSureInitialized();
+ List result = new ArrayList<>();
+ odps.tables().forEach(e -> result.add(e.getName()));
+ return result;
+ }
+
+ /**
+ * data tunnel url
+ * @return tunnelUrl, required by jni scanner.
+ */
+ public String getTunnelUrl() {
+ makeSureInitialized();
+ return tunnelUrl;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 22aa3bf5c6..3fc528568f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,9 +17,6 @@
package org.apache.doris.datasource.iceberg;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.Util;
@@ -27,7 +24,6 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.catalog.Catalog;
@@ -37,7 +33,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -55,47 +50,13 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected SupportsNamespaces nsCatalog;
public IcebergExternalCatalog(long catalogId, String name) {
- super(catalogId, name);
+ super(catalogId, name, InitCatalogLog.Type.ICEBERG);
}
@Override
protected void init() {
nsCatalog = (SupportsNamespaces) catalog;
- Map tmpDbNameToId = Maps.newConcurrentMap();
- Map tmpIdToDb = Maps.newConcurrentMap();
- InitCatalogLog initCatalogLog = new InitCatalogLog();
- initCatalogLog.setCatalogId(id);
- initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
- List allDatabaseNames = listDatabaseNames();
- Map includeDatabaseMap = getIncludeDatabaseMap();
- Map excludeDatabaseMap = getExcludeDatabaseMap();
- for (String dbName : allDatabaseNames) {
- // Exclude database map take effect with higher priority over include database map
- if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
- continue;
- }
- if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
- continue;
- }
- long dbId;
- if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
- dbId = dbNameToId.get(dbName);
- tmpDbNameToId.put(dbName, dbId);
- ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized(invalidCacheInInit);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addRefreshDb(dbId);
- } else {
- dbId = Env.getCurrentEnv().getNextId();
- tmpDbNameToId.put(dbName, dbId);
- IcebergExternalDatabase db = new IcebergExternalDatabase(this, dbId, dbName);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addCreateDb(dbId, dbName);
- }
- }
- dbNameToId = tmpDbNameToId;
- idToDb = tmpIdToDb;
- Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+ super.init();
}
protected Configuration getConfiguration() {
@@ -137,12 +98,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
.collect(Collectors.toList());
}
- @Override
- public List listDatabaseNames(SessionContext ctx) {
- makeSureInitialized();
- return new ArrayList<>(dbNameToId.keySet());
- }
-
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
new file mode 100644
index 0000000000..0fb4274049
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.constants;
+
+import org.apache.doris.datasource.credentials.CloudCredential;
+
+import java.util.Map;
+
+/**
+ * properties for aliyun max compute
+ */
+public class MCProperties extends BaseProperties {
+ public static final String REGION = "mc.region";
+ public static final String PROJECT = "mc.default.project";
+ public static final String ACCESS_KEY = "mc.access_key";
+ public static final String SECRET_KEY = "mc.secret_key";
+ public static final String SESSION_TOKEN = "mc.session_token";
+
+ public static CloudCredential getCredential(Map props) {
+ return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java
index 06dbd3f452..e6d72a1463 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java
@@ -18,8 +18,6 @@
package org.apache.doris.datasource.test;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
@@ -27,7 +25,6 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,18 +42,14 @@ public class TestExternalCatalog extends ExternalCatalog {
private TestCatalogProvider catalogProvider;
public TestExternalCatalog(long catalogId, String name, String resource, Map props) {
- super(catalogId, name);
+ super(catalogId, name, InitCatalogLog.Type.TEST);
this.type = "test";
this.catalogProperty = new CatalogProperty(resource, props);
Class> providerClazz = null;
try {
providerClazz = Class.forName(props.get("catalog_provider.class"));
this.catalogProvider = (TestCatalogProvider) providerClazz.newInstance();
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@@ -65,38 +58,7 @@ public class TestExternalCatalog extends ExternalCatalog {
protected void initLocalObjectsImpl() {
}
- @Override
- protected void init() {
- Map tmpDbNameToId = Maps.newConcurrentMap();
- Map tmpIdToDb = Maps.newConcurrentMap();
- InitCatalogLog initCatalogLog = new InitCatalogLog();
- initCatalogLog.setCatalogId(id);
- initCatalogLog.setType(InitCatalogLog.Type.TEST);
- List allDatabaseNames = mockedDatabaseNames();
- for (String dbName : allDatabaseNames) {
- long dbId;
- if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
- dbId = dbNameToId.get(dbName);
- tmpDbNameToId.put(dbName, dbId);
- ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized(invalidCacheInInit);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addRefreshDb(dbId);
- } else {
- dbId = Env.getCurrentEnv().getNextId();
- tmpDbNameToId.put(dbName, dbId);
- TestExternalDatabase db = new TestExternalDatabase(this, dbId, dbName);
- tmpIdToDb.put(dbId, db);
- initCatalogLog.addCreateDb(dbId, dbName);
- }
- }
- dbNameToId = tmpDbNameToId;
- idToDb = tmpIdToDb;
- Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
- }
-
- private List mockedDatabaseNames() {
-
+ protected List listDatabaseNames() {
return Lists.newArrayList(catalogProvider.getMetadata().keySet());
}
@@ -117,12 +79,6 @@ public class TestExternalCatalog extends ExternalCatalog {
return catalogProvider.getMetadata().get(dbName).get(tblName);
}
- @Override
- public List listDatabaseNames(SessionContext ctx) {
- makeSureInitialized();
- return Lists.newArrayList(dbNameToId.keySet());
- }
-
@Override
public List listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index d48097dea8..60fd334b7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -47,11 +47,14 @@ import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalTable;
+import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
+import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.JdbcExternalCatalog;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
@@ -183,7 +186,8 @@ public class GsonUtils {
.registerSubtype(IcebergHMSExternalCatalog.class, IcebergHMSExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
- .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName());
+ .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
+ .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
private static RuntimeTypeAdapterFactory dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
@@ -191,14 +195,16 @@ public class GsonUtils {
.registerSubtype(EsExternalDatabase.class, EsExternalDatabase.class.getSimpleName())
.registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
.registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
- .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName());
+ .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
+ .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
private static RuntimeTypeAdapterFactory tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
TableIf.class, "clazz").registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
.registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName())
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
- .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName());
+ .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
+ .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
// runtime adapter for class "HeartbeatResponse"
private static RuntimeTypeAdapterFactory hbResponseTypeAdapterFactory
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 92e2b94148..c4e74c0139 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -71,9 +71,11 @@ import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.HudiScanNode;
+import org.apache.doris.planner.external.MaxComputeScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
+import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TNullSide;
import org.apache.doris.thrift.TPushAggOp;
@@ -2020,6 +2022,11 @@ public class SingleNodePlanner {
case ICEBERG_EXTERNAL_TABLE:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
+ case MAX_COMPUTE_EXTERNAL_TABLE:
+ // TODO: support max compute scan node
+ scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
+ StatisticalType.MAX_COMPUTE_SCAN_NODE, true);
+ break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true);
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index edd63e2bec..9c5f0f4fbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -325,13 +325,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
return rangeDesc;
}
- protected TFileType getLocationType() throws UserException {
- throw new NotImplementedException("");
- }
+ protected abstract TFileType getLocationType() throws UserException;
- protected TFileFormatType getFileFormatType() throws UserException {
- throw new NotImplementedException("");
- }
+ protected abstract TFileFormatType getFileFormatType() throws UserException;
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
@@ -341,17 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
throw new NotImplementedException("");
}
- protected List getPathPartitionKeys() throws UserException {
- throw new NotImplementedException("");
- }
+ protected abstract List getPathPartitionKeys() throws UserException;
- protected TableIf getTargetTable() throws UserException {
- throw new NotImplementedException("");
- }
+ protected abstract TableIf getTargetTable() throws UserException;
- protected Map getLocationProperties() throws UserException {
- throw new NotImplementedException("");
- }
+ protected abstract Map getLocationProperties() throws UserException;
// eg: hdfs://namenode s3://buckets
protected String getFsName(FileSplit split) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
new file mode 100644
index 0000000000..367576ba6c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MaxComputeScanNode extends FileQueryScanNode {
+
+ private final MaxComputeExternalTable table;
+
+ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
+ StatisticalType statisticalType, boolean needCheckColumnPriv) {
+ super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ table = (MaxComputeExternalTable) desc.getTable();
+ }
+
+ @Override
+ protected TFileType getLocationType() throws UserException {
+ return TFileType.FILE_STREAM;
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() {
+ // TODO: use max compute format
+ return TFileFormatType.FORMAT_PARQUET;
+ }
+
+ @Override
+ public List getPathPartitionKeys() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected TableIf getTargetTable() throws UserException {
+ return table;
+ }
+
+ @Override
+ protected Map getLocationProperties() throws UserException {
+ return new HashMap<>();
+ }
+
+ @Override
+ protected List getSplits() throws UserException {
+ List result = new ArrayList<>();
+ result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], Collections.emptyList()));
+ return result;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index c39be3cf2c..1d1857856d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -48,6 +48,7 @@ public enum StatisticalType {
UNION_NODE,
TABLE_VALUED_FUNCTION_NODE,
FILE_SCAN_NODE,
+ MAX_COMPUTE_SCAN_NODE,
METADATA_SCAN_NODE,
JDBC_SCAN_NODE,
TEST_EXTERNAL_TABLE,
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index b518bdd219..0c5d04e7a6 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -308,6 +308,12 @@ struct TJdbcTable {
8: optional string jdbc_driver_checksum
}
+struct TMCTable {
+ 1: optional string tunnel_url
+ 2: optional string project
+ 3: optional string table
+}
+
// "Union" of all table types.
struct TTableDescriptor {
1: required Types.TTableId id
@@ -330,6 +336,7 @@ struct TTableDescriptor {
18: optional TIcebergTable icebergTable
19: optional THudiTable hudiTable
20: optional TJdbcTable jdbcTable
+ 21: optional TMCTable mcTable
}
struct TDescriptorTable {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 0c5c7d6f66..503007a6d3 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -596,6 +596,7 @@ enum TTableType {
HUDI_TABLE,
JDBC_TABLE,
TEST_EXTERNAL_TABLE,
+ MAX_COMPUTE_TABLE,
}
enum TKeysType {