From 3f2d1ae9a4e96ac6ae1589eaa77f541653f1a7d3 Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Tue, 16 May 2023 11:30:27 +0800 Subject: [PATCH] [feature-wip](multi-catalog)(step1)support connect to max compute (#19606) Issue Number: #19679 support connect to max compute metadata by odps sdk --- fe/fe-core/pom.xml | 5 + .../org/apache/doris/catalog/TableIf.java | 2 +- .../catalog/external/EsExternalDatabase.java | 120 +----------- .../catalog/external/EsExternalTable.java | 4 - .../catalog/external/ExternalDatabase.java | 109 ++++++++--- .../doris/catalog/external/ExternalTable.java | 2 +- .../catalog/external/HMSExternalDatabase.java | 118 +----------- .../external/IcebergExternalDatabase.java | 115 +----------- .../external/JdbcExternalDatabase.java | 121 +----------- .../external/MaxComputeExternalDatabase.java | 45 +++++ .../external/MaxComputeExternalTable.java | 176 ++++++++++++++++++ .../external/TestExternalDatabase.java | 121 +----------- .../doris/datasource/CatalogFactory.java | 3 + .../doris/datasource/EsExternalCatalog.java | 15 +- .../doris/datasource/ExternalCatalog.java | 161 ++++++++++------ .../doris/datasource/HMSExternalCatalog.java | 52 +----- .../doris/datasource/InitCatalogLog.java | 1 + .../doris/datasource/InitDatabaseLog.java | 2 + .../doris/datasource/JdbcExternalCatalog.java | 42 +---- .../datasource/MaxComputeExternalCatalog.java | 109 +++++++++++ .../iceberg/IcebergExternalCatalog.java | 49 +---- .../property/constants/MCProperties.java | 37 ++++ .../datasource/test/TestExternalCatalog.java | 50 +---- .../apache/doris/persist/gson/GsonUtils.java | 12 +- .../doris/planner/SingleNodePlanner.java | 7 + .../planner/external/FileQueryScanNode.java | 20 +- .../planner/external/MaxComputeScanNode.java | 80 ++++++++ .../doris/statistics/StatisticalType.java | 1 + gensrc/thrift/Descriptors.thrift | 7 + gensrc/thrift/Types.thrift | 1 + 30 files changed, 715 insertions(+), 872 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 779f3fab67..643ef1c7f4 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -407,6 +407,11 @@ under the License. + + com.aliyun.odps + odps-sdk-core + 0.43.3-public + org.springframework.boot diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 25c2319d5b..de3f4e917b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -134,7 +134,7 @@ public interface TableIf { enum TableType { MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC, TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE, - ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE; + ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE; public String toEngineName() { switch (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java index b69041952e..5d020789b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java @@ -17,35 +17,15 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.Env; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * Elasticsearch metastore external database. */ public class EsExternalDatabase extends ExternalDatabase implements GsonPostProcessable { - private static final Logger LOG = LogManager.getLogger(EsExternalDatabase.class); - - // Cache of table name to table id. - private Map tableNameToId = Maps.newConcurrentMap(); - @SerializedName(value = "idToTbl") - private Map idToTbl = Maps.newConcurrentMap(); /** * Create Elasticsearch external database. @@ -55,106 +35,12 @@ public class EsExternalDatabase extends ExternalDatabase implem * @param name database name. */ public EsExternalDatabase(ExternalCatalog extCatalog, long id, String name) { - super(extCatalog, id, name); - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newConcurrentMap(); - for (int i = 0; i < log.getRefreshCount(); i++) { - EsExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - for (int i = 0; i < log.getCreateCount(); i++) { - EsExternalTable table = new EsExternalTable(log.getCreateTableIds().get(i), - log.getCreateTableNames().get(i), name, (EsExternalCatalog) catalog); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - initialized = true; - } - - public void setTableExtCatalog(ExternalCatalog extCatalog) { - for (EsExternalTable table : idToTbl.values()) { - table.setCatalog(extCatalog); - } + super(extCatalog, id, name, InitDatabaseLog.Type.ES); } @Override - protected void init() { - InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); - initDatabaseLog.setType(InitDatabaseLog.Type.ES); - initDatabaseLog.setCatalogId(extCatalog.getId()); - initDatabaseLog.setDbId(id); - List tableNames = extCatalog.listTableNames(null, name); - if (tableNames != null) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newHashMap(); - for (String tableName : tableNames) { - long tblId; - if (tableNameToId != null && tableNameToId.containsKey(tableName)) { - tblId = tableNameToId.get(tableName); - tmpTableNameToId.put(tableName, tblId); - EsExternalTable table = idToTbl.get(tblId); - table.unsetObjectCreated(); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addRefreshTable(tblId); - } else { - tblId = Env.getCurrentEnv().getNextId(); - tmpTableNameToId.put(tableName, tblId); - EsExternalTable table = new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addCreateTable(tblId, tableName); - } - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - } - initialized = true; - Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); - } - - @Override - public Set getTableNamesWithLock() { - makeSureInitialized(); - return Sets.newHashSet(tableNameToId.keySet()); - } - - @Override - public List getTables() { - makeSureInitialized(); - return Lists.newArrayList(idToTbl.values()); - } - - @Override - public EsExternalTable getTableNullable(String tableName) { - makeSureInitialized(); - if (!tableNameToId.containsKey(tableName)) { - return null; - } - return idToTbl.get(tableNameToId.get(tableName)); - } - - @Override - public EsExternalTable getTableNullable(long tableId) { - makeSureInitialized(); - return idToTbl.get(tableId); - } - - public EsExternalTable getTableForReplay(long tableId) { - return idToTbl.get(tableId); - } - - @Override - public void gsonPostProcess() throws IOException { - tableNameToId = Maps.newConcurrentMap(); - for (EsExternalTable tbl : idToTbl.values()) { - tableNameToId.put(tbl.getName(), tbl.getId()); - } - rwLock = new ReentrantReadWriteLock(true); + protected EsExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog); } public void addTableForTest(EsExternalTable tbl) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index 0b2b2cd029..736b574500 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -26,9 +26,6 @@ import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.List; /** @@ -36,7 +33,6 @@ import java.util.List; */ public class EsExternalTable extends ExternalTable { - private static final Logger LOG = LogManager.getLogger(EsExternalTable.class); private EsTable esTable; /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index fec46d034c..bc16a766c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -32,6 +32,9 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.MasterCatalogExecutor; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; @@ -41,6 +44,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -50,8 +54,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * * @param External table type is ExternalTable or its subclass. */ -public class ExternalDatabase implements DatabaseIf, Writable, GsonPostProcessable { - +public abstract class ExternalDatabase + implements DatabaseIf, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class); protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); @@ -64,16 +68,14 @@ public class ExternalDatabase implements DatabaseIf, protected DatabaseProperty dbProperties = new DatabaseProperty(); @SerializedName(value = "initialized") protected boolean initialized = false; + // Cache of table name to table id. + protected Map tableNameToId = Maps.newConcurrentMap(); + @SerializedName(value = "idToTbl") + protected Map idToTbl = Maps.newConcurrentMap(); + protected final InitDatabaseLog.Type dbLogType; protected ExternalCatalog extCatalog; protected boolean invalidCacheInInit = true; - /** - * No args constructor for persist. - */ - public ExternalDatabase() { - initialized = false; - } - /** * Create external database. * @@ -81,10 +83,11 @@ public class ExternalDatabase implements DatabaseIf, * @param id Database id. * @param name Database name. */ - public ExternalDatabase(ExternalCatalog extCatalog, long id, String name) { + public ExternalDatabase(ExternalCatalog extCatalog, long id, String name, InitDatabaseLog.Type dbLogType) { this.extCatalog = extCatalog; this.id = id; this.name = name; + this.dbLogType = dbLogType; } public void setExtCatalog(ExternalCatalog extCatalog) { @@ -92,6 +95,9 @@ public class ExternalDatabase implements DatabaseIf, } public void setTableExtCatalog(ExternalCatalog extCatalog) { + for (T table : idToTbl.values()) { + table.setCatalog(extCatalog); + } } public void setUnInitialized(boolean invalidCache) { @@ -125,16 +131,61 @@ public class ExternalDatabase implements DatabaseIf, } } - protected void init() { - throw new NotImplementedException("init() is not implemented"); + public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { + Map tmpTableNameToId = Maps.newConcurrentMap(); + Map tmpIdToTbl = Maps.newConcurrentMap(); + for (int i = 0; i < log.getRefreshCount(); i++) { + T table = getTableForReplay(log.getRefreshTableIds().get(i)); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + for (int i = 0; i < log.getCreateCount(); i++) { + T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + initialized = true; } + protected void init() { + InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); + initDatabaseLog.setType(dbLogType); + initDatabaseLog.setCatalogId(extCatalog.getId()); + initDatabaseLog.setDbId(id); + List tableNames = extCatalog.listTableNames(null, name); + if (tableNames != null) { + Map tmpTableNameToId = Maps.newConcurrentMap(); + Map tmpIdToTbl = Maps.newHashMap(); + for (String tableName : tableNames) { + long tblId; + if (tableNameToId != null && tableNameToId.containsKey(tableName)) { + tblId = tableNameToId.get(tableName); + tmpTableNameToId.put(tableName, tblId); + T table = idToTbl.get(tblId); + table.unsetObjectCreated(); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addRefreshTable(tblId); + } else { + tblId = Env.getCurrentEnv().getNextId(); + tmpTableNameToId.put(tableName, tblId); + T table = getExternalTable(tableName, tblId, extCatalog); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addCreateTable(tblId, tableName); + } + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + } + initialized = true; + Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); + } + + protected abstract T getExternalTable(String tableName, long tblId, ExternalCatalog catalog); + public T getTableForReplay(long tableId) { - throw new NotImplementedException("getTableForReplay() is not implemented"); - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - throw new NotImplementedException("replayInitDb() is not implemented"); + return idToTbl.get(tableId); } @Override @@ -210,7 +261,8 @@ public class ExternalDatabase implements DatabaseIf, @Override public List getTables() { - throw new NotImplementedException("getTables() is not implemented"); + makeSureInitialized(); + return Lists.newArrayList(idToTbl.values()); } @Override @@ -235,17 +287,23 @@ public class ExternalDatabase implements DatabaseIf, @Override public Set getTableNamesWithLock() { - throw new NotImplementedException("getTableNamesWithLock() is not implemented"); + makeSureInitialized(); + return Sets.newHashSet(tableNameToId.keySet()); } @Override public T getTableNullable(String tableName) { - throw new NotImplementedException("getTableNullable() is not implemented"); + makeSureInitialized(); + if (!tableNameToId.containsKey(tableName)) { + return null; + } + return idToTbl.get(tableNameToId.get(tableName)); } @Override public T getTableNullable(long tableId) { - throw new NotImplementedException("getTableNullable() is not implemented"); + makeSureInitialized(); + return idToTbl.get(tableId); } @Override @@ -253,13 +311,20 @@ public class ExternalDatabase implements DatabaseIf, Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @SuppressWarnings("rawtypes") public static ExternalDatabase read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ExternalDatabase.class); } @Override - public void gsonPostProcess() throws IOException {} + public void gsonPostProcess() throws IOException { + tableNameToId = Maps.newConcurrentMap(); + for (T tbl : idToTbl.values()) { + tableNameToId.put(tbl.getName(), tbl.getId()); + } + rwLock = new ReentrantReadWriteLock(true); + } @Override public void dropTable(String tableName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 2a72f4cf6e..7a8ff075e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -67,7 +67,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @SerializedName(value = "dbName") protected String dbName; - protected boolean objectCreated = false; + protected boolean objectCreated; protected ExternalCatalog catalog; protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index 84cf3c4af7..01c62ee1ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -17,26 +17,17 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -45,11 +36,6 @@ import java.util.stream.Collectors; public class HMSExternalDatabase extends ExternalDatabase implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(HMSExternalDatabase.class); - // Cache of table name to table id. - private Map tableNameToId = Maps.newConcurrentMap(); - @SerializedName(value = "idToTbl") - private Map idToTbl = Maps.newConcurrentMap(); - /** * Create HMS external database. * @@ -58,73 +44,12 @@ public class HMSExternalDatabase extends ExternalDatabase impl * @param name database name. */ public HMSExternalDatabase(ExternalCatalog extCatalog, long id, String name) { - super(extCatalog, id, name); - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newConcurrentMap(); - for (int i = 0; i < log.getRefreshCount(); i++) { - HMSExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - for (int i = 0; i < log.getCreateCount(); i++) { - HMSExternalTable table = new HMSExternalTable(log.getCreateTableIds().get(i), - log.getCreateTableNames().get(i), name, (HMSExternalCatalog) catalog); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - initialized = true; - } - - public void setTableExtCatalog(ExternalCatalog extCatalog) { - for (HMSExternalTable table : idToTbl.values()) { - table.setCatalog(extCatalog); - } + super(extCatalog, id, name, InitDatabaseLog.Type.HMS); } @Override - protected void init() { - InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); - initDatabaseLog.setType(InitDatabaseLog.Type.HMS); - initDatabaseLog.setCatalogId(extCatalog.getId()); - initDatabaseLog.setDbId(id); - List tableNames = extCatalog.listTableNames(null, name); - if (tableNames != null) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newHashMap(); - for (String tableName : tableNames) { - long tblId; - if (tableNameToId != null && tableNameToId.containsKey(tableName)) { - tblId = tableNameToId.get(tableName); - tmpTableNameToId.put(tableName, tblId); - HMSExternalTable table = idToTbl.get(tblId); - table.unsetObjectCreated(); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addRefreshTable(tblId); - } else { - tblId = Env.getCurrentEnv().getNextId(); - tmpTableNameToId.put(tableName, tblId); - HMSExternalTable table = new HMSExternalTable(tblId, tableName, name, - (HMSExternalCatalog) extCatalog); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addCreateTable(tblId, tableName); - } - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - } - initialized = true; - Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); - } - - @Override - public List getTables() { - makeSureInitialized(); - return Lists.newArrayList(idToTbl.values()); + protected HMSExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog); } @Override @@ -133,40 +58,6 @@ public class HMSExternalDatabase extends ExternalDatabase impl return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList()); } - @Override - public Set getTableNamesWithLock() { - makeSureInitialized(); - return Sets.newHashSet(tableNameToId.keySet()); - } - - @Override - public HMSExternalTable getTableNullable(String tableName) { - makeSureInitialized(); - if (!tableNameToId.containsKey(tableName)) { - return null; - } - return idToTbl.get(tableNameToId.get(tableName)); - } - - @Override - public HMSExternalTable getTableNullable(long tableId) { - makeSureInitialized(); - return idToTbl.get(tableId); - } - - public HMSExternalTable getTableForReplay(long tableId) { - return idToTbl.get(tableId); - } - - @Override - public void gsonPostProcess() throws IOException { - tableNameToId = Maps.newConcurrentMap(); - for (HMSExternalTable tbl : idToTbl.values()) { - tableNameToId.put(tbl.getName(), tbl.getId()); - } - rwLock = new ReentrantReadWriteLock(true); - } - public void addTableForTest(HMSExternalTable tbl) { idToTbl.put(tbl.getId(), tbl); tableNameToId.put(tbl.getName(), tbl.getId()); @@ -188,8 +79,7 @@ public class HMSExternalDatabase extends ExternalDatabase impl LOG.debug("create table [{}]", tableName); makeSureInitialized(); tableNameToId.put(tableName, tableId); - HMSExternalTable table = new HMSExternalTable(tableId, tableName, name, - (HMSExternalCatalog) extCatalog); + HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog); idToTbl.put(tableId, table); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java index db1d1e13ce..96175487e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java @@ -17,104 +17,30 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; public class IcebergExternalDatabase extends ExternalDatabase implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(IcebergExternalDatabase.class); - // Cache of table name to table id. - private Map tableNameToId = Maps.newConcurrentMap(); - @SerializedName(value = "idToTbl") - private Map idToTbl = Maps.newConcurrentMap(); public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String name) { - super(extCatalog, id, name); - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newConcurrentMap(); - for (int i = 0; i < log.getRefreshCount(); i++) { - IcebergExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - for (int i = 0; i < log.getCreateCount(); i++) { - IcebergExternalTable table = new IcebergExternalTable(log.getCreateTableIds().get(i), - log.getCreateTableNames().get(i), name, (IcebergExternalCatalog) catalog); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - initialized = true; - } - - public void setTableExtCatalog(ExternalCatalog extCatalog) { - for (IcebergExternalTable table : idToTbl.values()) { - table.setCatalog(extCatalog); - } + super(extCatalog, id, name, InitDatabaseLog.Type.ICEBERG); } @Override - protected void init() { - InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); - initDatabaseLog.setType(InitDatabaseLog.Type.HMS); - initDatabaseLog.setCatalogId(extCatalog.getId()); - initDatabaseLog.setDbId(id); - List tableNames = extCatalog.listTableNames(null, name); - if (tableNames != null) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newHashMap(); - for (String tableName : tableNames) { - long tblId; - if (tableNameToId != null && tableNameToId.containsKey(tableName)) { - tblId = tableNameToId.get(tableName); - tmpTableNameToId.put(tableName, tblId); - IcebergExternalTable table = idToTbl.get(tblId); - table.unsetObjectCreated(); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addRefreshTable(tblId); - } else { - tblId = Env.getCurrentEnv().getNextId(); - tmpTableNameToId.put(tableName, tblId); - IcebergExternalTable table = new IcebergExternalTable(tblId, tableName, name, - (IcebergExternalCatalog) extCatalog); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addCreateTable(tblId, tableName); - } - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - } - initialized = true; - Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); - } - - @Override - public List getTables() { - makeSureInitialized(); - return Lists.newArrayList(idToTbl.values()); + protected IcebergExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new IcebergExternalTable(tblId, tableName, name, (IcebergExternalCatalog) extCatalog); } @Override @@ -123,41 +49,6 @@ public class IcebergExternalDatabase extends ExternalDatabase getTableNamesWithLock() { - makeSureInitialized(); - return Sets.newHashSet(tableNameToId.keySet()); - } - - @Override - public IcebergExternalTable getTableNullable(String tableName) { - makeSureInitialized(); - if (!tableNameToId.containsKey(tableName)) { - return null; - } - return idToTbl.get(tableNameToId.get(tableName)); - } - - @Override - public IcebergExternalTable getTableNullable(long tableId) { - makeSureInitialized(); - return idToTbl.get(tableId); - } - - @Override - public IcebergExternalTable getTableForReplay(long tableId) { - return idToTbl.get(tableId); - } - - @Override - public void gsonPostProcess() throws IOException { - tableNameToId = Maps.newConcurrentMap(); - for (IcebergExternalTable tbl : idToTbl.values()) { - tableNameToId.put(tbl.getName(), tbl.getId()); - } - rwLock = new ReentrantReadWriteLock(true); - } - @Override public void dropTable(String tableName) { LOG.debug("drop table [{}]", tableName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java index f04a389570..e60ca3dfe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalDatabase.java @@ -17,32 +17,12 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.Env; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.datasource.JdbcExternalCatalog; import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - public class JdbcExternalDatabase extends ExternalDatabase implements GsonPostProcessable { - private static final Logger LOG = LogManager.getLogger(JdbcExternalDatabase.class); - - // Cache of table name to table id. - private Map tableNameToId = Maps.newConcurrentMap(); - @SerializedName(value = "idToTbl") - private Map idToTbl = Maps.newConcurrentMap(); /** * Create Jdbc external database. @@ -52,107 +32,12 @@ public class JdbcExternalDatabase extends ExternalDatabase im * @param name database name. */ public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name) { - super(extCatalog, id, name); + super(extCatalog, id, name, InitDatabaseLog.Type.JDBC); } @Override - protected void init() { - InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); - initDatabaseLog.setType(InitDatabaseLog.Type.JDBC); - initDatabaseLog.setCatalogId(extCatalog.getId()); - initDatabaseLog.setDbId(id); - List tableNames = extCatalog.listTableNames(null, name); - if (tableNames != null) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newHashMap(); - for (String tableName : tableNames) { - long tblId; - if (tableNameToId != null && tableNameToId.containsKey(tableName)) { - tblId = tableNameToId.get(tableName); - tmpTableNameToId.put(tableName, tblId); - JdbcExternalTable table = idToTbl.get(tblId); - table.unsetObjectCreated(); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addRefreshTable(tblId); - } else { - tblId = Env.getCurrentEnv().getNextId(); - tmpTableNameToId.put(tableName, tblId); - JdbcExternalTable table = new JdbcExternalTable(tblId, tableName, name, - (JdbcExternalCatalog) extCatalog); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addCreateTable(tblId, tableName); - } - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - } - initialized = true; - Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); - } - - public void setTableExtCatalog(ExternalCatalog extCatalog) { - for (JdbcExternalTable table : idToTbl.values()) { - table.setCatalog(extCatalog); - } - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newConcurrentMap(); - for (int i = 0; i < log.getRefreshCount(); i++) { - JdbcExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - for (int i = 0; i < log.getCreateCount(); i++) { - JdbcExternalTable table = new JdbcExternalTable(log.getCreateTableIds().get(i), - log.getCreateTableNames().get(i), name, (JdbcExternalCatalog) catalog); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - initialized = true; - } - - @Override - public Set getTableNamesWithLock() { - makeSureInitialized(); - return Sets.newHashSet(tableNameToId.keySet()); - } - - @Override - public List getTables() { - makeSureInitialized(); - return Lists.newArrayList(idToTbl.values()); - } - - @Override - public JdbcExternalTable getTableNullable(String tableName) { - makeSureInitialized(); - if (!tableNameToId.containsKey(tableName)) { - return null; - } - return idToTbl.get(tableNameToId.get(tableName)); - } - - @Override - public JdbcExternalTable getTableNullable(long tableId) { - makeSureInitialized(); - return idToTbl.get(tableId); - } - - public JdbcExternalTable getTableForReplay(long tableId) { - return idToTbl.get(tableId); - } - - @Override - public void gsonPostProcess() throws IOException { - tableNameToId = Maps.newConcurrentMap(); - for (JdbcExternalTable tbl : idToTbl.values()) { - tableNameToId.put(tbl.getName(), tbl.getId()); - } - rwLock = new ReentrantReadWriteLock(true); + protected JdbcExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog); } public void addTableForTest(JdbcExternalTable tbl) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java new file mode 100644 index 0000000000..56e892eab1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalDatabase.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.MaxComputeExternalCatalog; +import org.apache.doris.persist.gson.GsonPostProcessable; + +/** + * MaxCompute external database. + */ +public class MaxComputeExternalDatabase extends ExternalDatabase + implements GsonPostProcessable { + /** + * Create MaxCompute external database. + * + * @param extCatalog External catalog this database belongs to. + * @param id database id. + * @param name database name. + */ + public MaxComputeExternalDatabase(ExternalCatalog extCatalog, long id, String name) { + super(extCatalog, id, name, InitDatabaseLog.Type.MAX_COMPUTE); + } + + @Override + protected MaxComputeExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new MaxComputeExternalTable(tblId, tableName, name, (MaxComputeExternalCatalog) extCatalog); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java new file mode 100644 index 0000000000..5de781a686 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.MaxComputeExternalCatalog; +import org.apache.doris.thrift.TMCTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.Table; +import com.aliyun.odps.type.ArrayTypeInfo; +import com.aliyun.odps.type.DecimalTypeInfo; +import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.StructTypeInfo; +import com.aliyun.odps.type.TypeInfo; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; + +/** + * MaxCompute external table. + */ +public class MaxComputeExternalTable extends ExternalTable { + + private Table odpsTable; + + public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { + super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); + } + + @Override + protected synchronized void makeSureInitialized() { + super.makeSureInitialized(); + if (!objectCreated) { + odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); + objectCreated = true; + } + } + + @Override + public List initSchema() { + makeSureInitialized(); + List columns = odpsTable.getSchema().getColumns(); + List result = Lists.newArrayListWithCapacity(columns.size()); + for (com.aliyun.odps.Column field : columns) { + result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, + true, field.getComment(), true, -1)); + } + return result; + } + + private Type mcTypeToDorisType(TypeInfo typeInfo) { + OdpsType odpsType = typeInfo.getOdpsType(); + switch (odpsType) { + case VOID: { + return Type.NULL; + } + case BOOLEAN: { + return Type.BOOLEAN; + } + case TINYINT: { + return Type.TINYINT; + } + case SMALLINT: { + return Type.SMALLINT; + } + case INT: { + return Type.INT; + } + case BIGINT: { + return Type.BIGINT; + } + case CHAR: { + return Type.CHAR; + } + case VARCHAR: { + return Type.VARCHAR; + } + case STRING: { + return Type.STRING; + } + case JSON: { + return Type.UNSUPPORTED; + // return Type.JSONB; + } + case FLOAT: { + return Type.FLOAT; + } + case DOUBLE: { + return Type.DOUBLE; + } + case DECIMAL: { + DecimalTypeInfo decimal = (DecimalTypeInfo) typeInfo; + return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); + } + case DATE: { + return ScalarType.createDateV2Type(); + } + case DATETIME: + case TIMESTAMP: { + return ScalarType.createDatetimeV2Type(3); + } + case ARRAY: { + ArrayTypeInfo arrayType = (ArrayTypeInfo) typeInfo; + Type innerType = mcTypeToDorisType(arrayType.getElementTypeInfo()); + return ArrayType.create(innerType, true); + } + case MAP: { + MapTypeInfo mapType = (MapTypeInfo) typeInfo; + return new MapType(mcTypeToDorisType(mapType.getKeyTypeInfo()), + mcTypeToDorisType(mapType.getValueTypeInfo())); + } + case STRUCT: { + ArrayList fields = new ArrayList<>(); + StructTypeInfo structType = (StructTypeInfo) typeInfo; + List fieldNames = structType.getFieldNames(); + List fieldTypeInfos = structType.getFieldTypeInfos(); + for (int i = 0; i < structType.getFieldCount(); i++) { + Type innerType = mcTypeToDorisType(fieldTypeInfos.get(i)); + fields.add(new StructField(fieldNames.get(i), innerType)); + } + return new StructType(fields); + } + case BINARY: + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + odpsType); + } + } + + @Override + public TTableDescriptor toThrift() { + List schema = getFullSchema(); + TMCTable tMcTable = new TMCTable(); + tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) catalog).getTunnelUrl()); + // use mc project as dbName + tMcTable.setProject(dbName); + tMcTable.setTable(name); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE, + schema.size(), 0, getName(), dbName); + tTableDescriptor.setMcTable(tMcTable); + return tTableDescriptor; + } + + @Override + public String getMysqlType() { + return "BASE TABLE"; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java index fe1852241d..fbac107f45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/TestExternalDatabase.java @@ -17,134 +17,19 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.Env; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.datasource.test.TestExternalCatalog; import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - public class TestExternalDatabase extends ExternalDatabase implements GsonPostProcessable { - private static final Logger LOG = LogManager.getLogger(TestExternalDatabase.class); - - // Cache of table name to table id. - private Map tableNameToId = Maps.newConcurrentMap(); - @SerializedName(value = "idToTbl") - private Map idToTbl = Maps.newConcurrentMap(); public TestExternalDatabase(ExternalCatalog extCatalog, long id, String name) { - super(extCatalog, id, name); + super(extCatalog, id, name, InitDatabaseLog.Type.TEST); } @Override - protected void init() { - InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); - initDatabaseLog.setType(InitDatabaseLog.Type.TEST); - initDatabaseLog.setCatalogId(extCatalog.getId()); - initDatabaseLog.setDbId(id); - List tableNames = extCatalog.listTableNames(null, name); - if (tableNames != null) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newHashMap(); - for (String tableName : tableNames) { - long tblId; - if (tableNameToId != null && tableNameToId.containsKey(tableName)) { - tblId = tableNameToId.get(tableName); - tmpTableNameToId.put(tableName, tblId); - TestExternalTable table = idToTbl.get(tblId); - table.unsetObjectCreated(); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addRefreshTable(tblId); - } else { - tblId = Env.getCurrentEnv().getNextId(); - tmpTableNameToId.put(tableName, tblId); - TestExternalTable table = new TestExternalTable(tblId, tableName, name, - (TestExternalCatalog) extCatalog); - tmpIdToTbl.put(tblId, table); - initDatabaseLog.addCreateTable(tblId, tableName); - } - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - } - initialized = true; - Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); - } - - public void setTableExtCatalog(ExternalCatalog extCatalog) { - for (TestExternalTable table : idToTbl.values()) { - table.setCatalog(extCatalog); - } - } - - public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { - Map tmpTableNameToId = Maps.newConcurrentMap(); - Map tmpIdToTbl = Maps.newConcurrentMap(); - for (int i = 0; i < log.getRefreshCount(); i++) { - TestExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - for (int i = 0; i < log.getCreateCount(); i++) { - TestExternalTable table = new TestExternalTable(log.getCreateTableIds().get(i), - log.getCreateTableNames().get(i), name, (TestExternalCatalog) catalog); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); - } - tableNameToId = tmpTableNameToId; - idToTbl = tmpIdToTbl; - initialized = true; - } - - @Override - public Set getTableNamesWithLock() { - makeSureInitialized(); - return Sets.newHashSet(tableNameToId.keySet()); - } - - @Override - public List getTables() { - makeSureInitialized(); - return Lists.newArrayList(idToTbl.values()); - } - - @Override - public TestExternalTable getTableNullable(String tableName) { - makeSureInitialized(); - if (!tableNameToId.containsKey(tableName)) { - return null; - } - return idToTbl.get(tableNameToId.get(tableName)); - } - - @Override - public TestExternalTable getTableNullable(long tableId) { - makeSureInitialized(); - return idToTbl.get(tableId); - } - - public TestExternalTable getTableForReplay(long tableId) { - return idToTbl.get(tableId); - } - - @Override - public void gsonPostProcess() throws IOException { - tableNameToId = Maps.newConcurrentMap(); - for (TestExternalTable tbl : idToTbl.values()) { - tableNameToId.put(tbl.getName(), tbl.getId()); - } - rwLock = new ReentrantReadWriteLock(true); + protected TestExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new TestExternalTable(tblId, tableName, name, (TestExternalCatalog) extCatalog); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index 2da0fc4dd1..4a52b9b110 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -113,6 +113,9 @@ public class CatalogFactory { case "iceberg": catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props); break; + case "max_compute": + catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props); + break; case "test": if (!FeConstants.runningUnitTest) { throw new DdlException("test catalog is only for FE unit test"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java index 52f9bc984f..0b81973d31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java @@ -20,6 +20,8 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EsResource; import org.apache.doris.catalog.external.EsExternalDatabase; +import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.external.elasticsearch.EsRestClient; import com.google.common.collect.Lists; @@ -29,7 +31,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,7 +48,7 @@ public class EsExternalCatalog extends ExternalCatalog { * Default constructor for EsExternalCatalog. */ public EsExternalCatalog(long catalogId, String name, String resource, Map props) { - super(catalogId, name); + super(catalogId, name, InitCatalogLog.Type.ES); this.type = "es"; this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props)); } @@ -124,7 +125,7 @@ public class EsExternalCatalog extends ExternalCatalog { protected void init() { InitCatalogLog initCatalogLog = new InitCatalogLog(); initCatalogLog.setCatalogId(id); - initCatalogLog.setType(InitCatalogLog.Type.ES); + initCatalogLog.setType(logType); if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) { idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(invalidCacheInInit); initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB)); @@ -133,19 +134,13 @@ public class EsExternalCatalog extends ExternalCatalog { idToDb = Maps.newConcurrentMap(); long defaultDbId = Env.getCurrentEnv().getNextId(); dbNameToId.put(DEFAULT_DB, defaultDbId); - EsExternalDatabase db = new EsExternalDatabase(this, defaultDbId, DEFAULT_DB); + ExternalDatabase db = getDbForInit(DEFAULT_DB, defaultDbId, logType); idToDb.put(defaultDbId, db); initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB); } Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); } - @Override - public List listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return new ArrayList<>(dbNameToId.keySet()); - } - @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index adcb109ace..6198796688 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -26,6 +26,8 @@ import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.catalog.external.IcebergExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalDatabase; +import org.apache.doris.catalog.external.MaxComputeExternalDatabase; +import org.apache.doris.catalog.external.TestExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; @@ -51,6 +53,7 @@ import org.jetbrains.annotations.Nullable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -59,7 +62,8 @@ import java.util.Optional; * The abstract class for all types of external catalogs. */ @Data -public abstract class ExternalCatalog implements CatalogIf, Writable, GsonPostProcessable { +public abstract class ExternalCatalog + implements CatalogIf>, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class); // Unique id of this catalog, will be assigned after catalog is loaded. @@ -69,13 +73,15 @@ public abstract class ExternalCatalog implements CatalogIf, Wr protected String name; @SerializedName(value = "type") protected String type; + // TODO: replace type with log type + protected final InitCatalogLog.Type logType; // save properties of this catalog, such as hive meta store url. @SerializedName(value = "catalogProperty") protected CatalogProperty catalogProperty; @SerializedName(value = "initialized") private boolean initialized = false; @SerializedName(value = "idToDb") - protected Map idToDb = Maps.newConcurrentMap(); + protected Map> idToDb = Maps.newConcurrentMap(); // db name does not contains "default_cluster" protected Map dbNameToId = Maps.newConcurrentMap(); private boolean objectCreated = false; @@ -84,15 +90,25 @@ public abstract class ExternalCatalog implements CatalogIf, Wr private ExternalSchemaCache schemaCache; private String comment; - public ExternalCatalog(long catalogId, String name) { + public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType) { this.id = catalogId; this.name = name; + this.logType = logType; + } + + protected List listDatabaseNames() { + throw new UnsupportedOperationException("Unsupported operation: " + + "listDatabaseNames from remote client when init catalog with " + logType.name()); } /** * @return names of database in this catalog. */ - public abstract List listDatabaseNames(SessionContext ctx); + // public abstract List listDatabaseNames(SessionContext ctx); + public List listDatabaseNames(SessionContext ctx) { + makeSureInitialized(); + return new ArrayList<>(dbNameToId.keySet()); + } /** * @param dbName @@ -204,7 +220,43 @@ public abstract class ExternalCatalog implements CatalogIf, Wr } // init schema related objects - protected abstract void init(); + protected void init() { + Map tmpDbNameToId = Maps.newConcurrentMap(); + Map> tmpIdToDb = Maps.newConcurrentMap(); + InitCatalogLog initCatalogLog = new InitCatalogLog(); + initCatalogLog.setCatalogId(id); + initCatalogLog.setType(logType); + List allDatabases = listDatabaseNames(); + Map includeDatabaseMap = getIncludeDatabaseMap(); + Map excludeDatabaseMap = getExcludeDatabaseMap(); + for (String dbName : allDatabases) { + // Exclude database map take effect with higher priority over include database map + if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) { + continue; + } + if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) { + continue; + } + long dbId; + if (dbNameToId != null && dbNameToId.containsKey(dbName)) { + dbId = dbNameToId.get(dbName); + tmpDbNameToId.put(dbName, dbId); + ExternalDatabase db = idToDb.get(dbId); + db.setUnInitialized(invalidCacheInInit); + tmpIdToDb.put(dbId, db); + initCatalogLog.addRefreshDb(dbId); + } else { + dbId = Env.getCurrentEnv().getNextId(); + tmpDbNameToId.put(dbName, dbId); + ExternalDatabase db = getDbForInit(dbName, dbId, logType); + tmpIdToDb.put(dbId, db); + initCatalogLog.addCreateDb(dbId, dbName); + } + } + dbNameToId = tmpDbNameToId; + idToDb = tmpIdToDb; + Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); + } public void setUninitialized(boolean invalidCache) { this.objectCreated = false; @@ -219,17 +271,13 @@ public abstract class ExternalCatalog implements CatalogIf, Wr Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); } - public ExternalDatabase getDbForReplay(long dbId) { - return idToDb.get(dbId); - } - public final List getSchema(String dbName, String tblName) { makeSureInitialized(); - Optional db = getDb(dbName); + Optional> db = getDb(dbName); if (db.isPresent()) { - Optional table = db.get().getTable(tblName); + Optional table = db.get().getTable(tblName); if (table.isPresent()) { - return ((ExternalTable) table.get()).initSchema(); + return table.get().initSchema(); } } // return one column with unsupported type. @@ -287,7 +335,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr @Nullable @Override - public ExternalDatabase getDbNullable(String dbName) { + public ExternalDatabase getDbNullable(String dbName) { try { makeSureInitialized(); } catch (Exception e) { @@ -303,7 +351,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr @Nullable @Override - public ExternalDatabase getDbNullable(long dbId) { + public ExternalDatabase getDbNullable(long dbId) { try { makeSureInitialized(); } catch (Exception e) { @@ -358,54 +406,51 @@ public abstract class ExternalCatalog implements CatalogIf, Wr public void replayInitCatalog(InitCatalogLog log) { Map tmpDbNameToId = Maps.newConcurrentMap(); - Map tmpIdToDb = Maps.newConcurrentMap(); + Map> tmpIdToDb = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { - ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i)); + ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i)); db.setUnInitialized(invalidCacheInInit); tmpDbNameToId.put(db.getFullName(), db.getId()); tmpIdToDb.put(db.getId(), db); } - switch (log.getType()) { - case HMS: - for (int i = 0; i < log.getCreateCount(); i++) { - HMSExternalDatabase db = new HMSExternalDatabase( - this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); - tmpDbNameToId.put(db.getFullName(), db.getId()); - tmpIdToDb.put(db.getId(), db); - } - break; - case ES: - for (int i = 0; i < log.getCreateCount(); i++) { - EsExternalDatabase db = new EsExternalDatabase( - this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); - tmpDbNameToId.put(db.getFullName(), db.getId()); - tmpIdToDb.put(db.getId(), db); - } - break; - case JDBC: - for (int i = 0; i < log.getCreateCount(); i++) { - JdbcExternalDatabase db = new JdbcExternalDatabase( - this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); - tmpDbNameToId.put(db.getFullName(), db.getId()); - tmpIdToDb.put(db.getId(), db); - } - break; - case ICEBERG: - for (int i = 0; i < log.getCreateCount(); i++) { - IcebergExternalDatabase db = new IcebergExternalDatabase( - this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); - tmpDbNameToId.put(db.getFullName(), db.getId()); - tmpIdToDb.put(db.getId(), db); - } - break; - default: - break; + for (int i = 0; i < log.getCreateCount(); i++) { + ExternalDatabase db = + getDbForInit(log.getCreateDbNames().get(i), log.getCreateDbIds().get(i), log.getType()); + if (db != null) { + tmpDbNameToId.put(db.getFullName(), db.getId()); + tmpIdToDb.put(db.getId(), db); + } } dbNameToId = tmpDbNameToId; idToDb = tmpIdToDb; initialized = true; } + public ExternalDatabase getDbForReplay(long dbId) { + return idToDb.get(dbId); + } + + protected ExternalDatabase getDbForInit(String dbName, long dbId, + InitCatalogLog.Type logType) { + switch (logType) { + case HMS: + return new HMSExternalDatabase(this, dbId, dbName); + case ES: + return new EsExternalDatabase(this, dbId, dbName); + case JDBC: + return new JdbcExternalDatabase(this, dbId, dbName); + case ICEBERG: + return new IcebergExternalDatabase(this, dbId, dbName); + case MAX_COMPUTE: + return new MaxComputeExternalDatabase(this, dbId, dbName); + case TEST: + return new TestExternalDatabase(this, dbId, dbName); + default: + break; + } + return null; + } + /** * External catalog has no cluster semantics. */ @@ -425,7 +470,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr idToDb = Maps.newConcurrentMap(); } dbNameToId = Maps.newConcurrentMap(); - for (ExternalDatabase db : idToDb.values()) { + for (ExternalDatabase db : idToDb.values()) { dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); db.setExtCatalog(this); db.setTableExtCatalog(this); @@ -436,7 +481,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr } } - public void addDatabaseForTest(ExternalDatabase db) { + public void addDatabaseForTest(ExternalDatabase db) { idToDb.put(db.getId(), db); dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } @@ -449,15 +494,15 @@ public abstract class ExternalCatalog implements CatalogIf, Wr throw new NotImplementedException("createDatabase not implemented"); } - public Map getIncludeDatabaseMap() { + public Map getIncludeDatabaseMap() { return getSpecifiedDatabaseMap(Resource.INCLUDE_DATABASE_LIST); } - public Map getExcludeDatabaseMap() { + public Map getExcludeDatabaseMap() { return getSpecifiedDatabaseMap(Resource.EXCLUDE_DATABASE_LIST); } - public Map getSpecifiedDatabaseMap(String catalogPropertyKey) { + public Map getSpecifiedDatabaseMap(String catalogPropertyKey) { String specifiedDatabaseList = catalogProperty.getOrDefault(catalogPropertyKey, ""); Map specifiedDatabaseMap = Maps.newHashMap(); specifiedDatabaseList = specifiedDatabaseList.trim(); @@ -465,8 +510,8 @@ public abstract class ExternalCatalog implements CatalogIf, Wr return specifiedDatabaseMap; } String[] databaseList = specifiedDatabaseList.split(","); - for (int i = 0; i < databaseList.length; i++) { - String dbname = databaseList[i].trim(); + for (String database : databaseList) { + String dbname = database.trim(); if (!dbname.isEmpty()) { specifiedDatabaseMap.put(dbname, true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index f5b039a7cf..7846847c83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -31,7 +32,6 @@ import org.apache.doris.datasource.property.constants.HMSProperties; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -70,7 +70,7 @@ public class HMSExternalCatalog extends ExternalCatalog { * Default constructor for HMSExternalCatalog. */ public HMSExternalCatalog(long catalogId, String name, String resource, Map props) { - super(catalogId, name); + super(catalogId, name, InitCatalogLog.Type.HMS); this.type = "hms"; props = PropertyConverter.convertToMetaProperties(props); catalogProperty = new CatalogProperty(resource, props); @@ -122,44 +122,8 @@ public class HMSExternalCatalog extends ExternalCatalog { return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); } - @Override - protected void init() { - Map tmpDbNameToId = Maps.newConcurrentMap(); - Map tmpIdToDb = Maps.newConcurrentMap(); - InitCatalogLog initCatalogLog = new InitCatalogLog(); - initCatalogLog.setCatalogId(id); - initCatalogLog.setType(InitCatalogLog.Type.HMS); - List allDatabases = client.getAllDatabases(); - Map includeDatabaseMap = getIncludeDatabaseMap(); - Map excludeDatabaseMap = getExcludeDatabaseMap(); - // Update the db name to id map. - for (String dbName : allDatabases) { - // Exclude database map take effect with higher priority over include database map - if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) { - continue; - } - if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) { - continue; - } - long dbId; - if (dbNameToId != null && dbNameToId.containsKey(dbName)) { - dbId = dbNameToId.get(dbName); - tmpDbNameToId.put(dbName, dbId); - ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(invalidCacheInInit); - tmpIdToDb.put(dbId, db); - initCatalogLog.addRefreshDb(dbId); - } else { - dbId = Env.getCurrentEnv().getNextId(); - tmpDbNameToId.put(dbName, dbId); - HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName); - tmpIdToDb.put(dbId, db); - initCatalogLog.addCreateDb(dbId, dbName); - } - } - dbNameToId = tmpDbNameToId; - idToDb = tmpIdToDb; - Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); + protected List listDatabaseNames() { + return client.getAllDatabases(); } @Override @@ -193,12 +157,6 @@ public class HMSExternalCatalog extends ExternalCatalog { client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE); } - @Override - public List listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return Lists.newArrayList(dbNameToId.keySet()); - } - @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); @@ -292,7 +250,7 @@ public class HMSExternalCatalog extends ExternalCatalog { makeSureInitialized(); LOG.debug("create database [{}]", dbName); dbNameToId.put(dbName, dbId); - HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName); + ExternalDatabase db = getDbForInit(dbName, dbId, logType); idToDb.put(dbId, db); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java index b9d0ce185d..d20a15d837 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable { ES, JDBC, ICEBERG, + MAX_COMPUTE, TEST, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java index 9036353d30..9c3ad72b7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java @@ -34,8 +34,10 @@ import java.util.List; public class InitDatabaseLog implements Writable { public enum Type { HMS, + ICEBERG, ES, JDBC, + MAX_COMPUTE, TEST, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java index 3ce9a1ad94..ec898af623 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java @@ -17,9 +17,7 @@ package org.apache.doris.datasource; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; -import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.common.DdlException; import org.apache.doris.external.jdbc.JdbcClient; @@ -51,7 +49,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { public JdbcExternalCatalog(long catalogId, String name, String resource, Map props) throws DdlException { - super(catalogId, name); + super(catalogId, name, InitCatalogLog.Type.JDBC); this.type = "jdbc"; this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props)); } @@ -140,40 +138,8 @@ public class JdbcExternalCatalog extends ExternalCatalog { getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap()); } - @Override - protected void init() { - Map tmpDbNameToId = Maps.newConcurrentMap(); - Map tmpIdToDb = Maps.newConcurrentMap(); - InitCatalogLog initCatalogLog = new InitCatalogLog(); - initCatalogLog.setCatalogId(id); - initCatalogLog.setType(InitCatalogLog.Type.JDBC); - List allDatabaseNames = jdbcClient.getDatabaseNameList(); - for (String dbName : allDatabaseNames) { - long dbId; - if (dbNameToId != null && dbNameToId.containsKey(dbName)) { - dbId = dbNameToId.get(dbName); - tmpDbNameToId.put(dbName, dbId); - ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(invalidCacheInInit); - tmpIdToDb.put(dbId, db); - initCatalogLog.addRefreshDb(dbId); - } else { - dbId = Env.getCurrentEnv().getNextId(); - tmpDbNameToId.put(dbName, dbId); - JdbcExternalDatabase db = new JdbcExternalDatabase(this, dbId, dbName); - tmpIdToDb.put(dbId, db); - initCatalogLog.addCreateDb(dbId, dbName); - } - } - dbNameToId = tmpDbNameToId; - idToDb = tmpIdToDb; - Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); - } - - @Override - public List listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return Lists.newArrayList(dbNameToId.keySet()); + protected List listDatabaseNames() { + return jdbcClient.getDatabaseNameList(); } @Override @@ -182,7 +148,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { JdbcExternalDatabase db = (JdbcExternalDatabase) idToDb.get(dbNameToId.get(dbName)); if (db != null && db.isInitialized()) { List names = Lists.newArrayList(); - db.getTables().stream().forEach(table -> names.add(table.getName())); + db.getTables().forEach(table -> names.add(table.getName())); return names; } else { return jdbcClient.getTablesNameList(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java new file mode 100644 index 0000000000..1b925df791 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.datasource.credentials.CloudCredential; +import org.apache.doris.datasource.property.constants.MCProperties; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.account.Account; +import com.aliyun.odps.account.AliyunAccount; +import com.google.common.base.Strings; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class MaxComputeExternalCatalog extends ExternalCatalog { + private Odps odps; + private String tunnelUrl; + private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api"; + private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun.com"; + + public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map props) { + super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE); + this.type = "max_compute"; + catalogProperty = new CatalogProperty(resource, props); + } + + @Override + protected void initLocalObjectsImpl() { + Map props = catalogProperty.getProperties(); + String region = props.get(MCProperties.REGION); + String defaultProject = props.get(MCProperties.PROJECT); + if (Strings.isNullOrEmpty(region)) { + throw new IllegalArgumentException("Missing required property '" + MCProperties.REGION + "'."); + } + if (Strings.isNullOrEmpty(defaultProject)) { + throw new IllegalArgumentException("Missing required property '" + MCProperties.PROJECT + "'."); + } + if (region.startsWith("oss-")) { + // may use oss-cn-beijing, ensure compatible + region = region.replace("oss-", ""); + } + this.tunnelUrl = tunnelUrlTemplate.replace("{}", region); + CloudCredential credential = MCProperties.getCredential(props); + Account account = new AliyunAccount(credential.getAccessKey(), credential.getSecretKey()); + this.odps = new Odps(account); + odps.setEndpoint(odpsUrlTemplate.replace("{}", region)); + odps.setDefaultProject(defaultProject); + } + + public Odps getClient() { + makeSureInitialized(); + return odps; + } + + protected List listDatabaseNames() { + List result = new ArrayList<>(); + try { + result.add(odps.projects().get(odps.getDefaultProject()).getName()); + } catch (OdpsException e) { + throw new RuntimeException(e); + } + return result; + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); + try { + return odps.tables().exists(tblName); + } catch (OdpsException e) { + throw new RuntimeException(e); + } + } + + @Override + public List listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); + List result = new ArrayList<>(); + odps.tables().forEach(e -> result.add(e.getName())); + return result; + } + + /** + * data tunnel url + * @return tunnelUrl, required by jni scanner. + */ + public String getTunnelUrl() { + makeSureInitialized(); + return tunnelUrl; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 22aa3bf5c6..3fc528568f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,9 +17,6 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.external.ExternalDatabase; -import org.apache.doris.catalog.external.IcebergExternalDatabase; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.Util; @@ -27,7 +24,6 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.catalog.Catalog; @@ -37,7 +33,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -55,47 +50,13 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { protected SupportsNamespaces nsCatalog; public IcebergExternalCatalog(long catalogId, String name) { - super(catalogId, name); + super(catalogId, name, InitCatalogLog.Type.ICEBERG); } @Override protected void init() { nsCatalog = (SupportsNamespaces) catalog; - Map tmpDbNameToId = Maps.newConcurrentMap(); - Map tmpIdToDb = Maps.newConcurrentMap(); - InitCatalogLog initCatalogLog = new InitCatalogLog(); - initCatalogLog.setCatalogId(id); - initCatalogLog.setType(InitCatalogLog.Type.ICEBERG); - List allDatabaseNames = listDatabaseNames(); - Map includeDatabaseMap = getIncludeDatabaseMap(); - Map excludeDatabaseMap = getExcludeDatabaseMap(); - for (String dbName : allDatabaseNames) { - // Exclude database map take effect with higher priority over include database map - if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) { - continue; - } - if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) { - continue; - } - long dbId; - if (dbNameToId != null && dbNameToId.containsKey(dbName)) { - dbId = dbNameToId.get(dbName); - tmpDbNameToId.put(dbName, dbId); - ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(invalidCacheInInit); - tmpIdToDb.put(dbId, db); - initCatalogLog.addRefreshDb(dbId); - } else { - dbId = Env.getCurrentEnv().getNextId(); - tmpDbNameToId.put(dbName, dbId); - IcebergExternalDatabase db = new IcebergExternalDatabase(this, dbId, dbName); - tmpIdToDb.put(dbId, db); - initCatalogLog.addCreateDb(dbId, dbName); - } - } - dbNameToId = tmpDbNameToId; - idToDb = tmpIdToDb; - Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); + super.init(); } protected Configuration getConfiguration() { @@ -137,12 +98,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { .collect(Collectors.toList()); } - @Override - public List listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return new ArrayList<>(dbNameToId.keySet()); - } - @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java new file mode 100644 index 0000000000..0fb4274049 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.constants; + +import org.apache.doris.datasource.credentials.CloudCredential; + +import java.util.Map; + +/** + * properties for aliyun max compute + */ +public class MCProperties extends BaseProperties { + public static final String REGION = "mc.region"; + public static final String PROJECT = "mc.default.project"; + public static final String ACCESS_KEY = "mc.access_key"; + public static final String SECRET_KEY = "mc.secret_key"; + public static final String SESSION_TOKEN = "mc.session_token"; + + public static CloudCredential getCredential(Map props) { + return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java index 06dbd3f452..e6d72a1463 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalCatalog.java @@ -18,8 +18,6 @@ package org.apache.doris.datasource.test; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.TestExternalDatabase; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; @@ -27,7 +25,6 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,18 +42,14 @@ public class TestExternalCatalog extends ExternalCatalog { private TestCatalogProvider catalogProvider; public TestExternalCatalog(long catalogId, String name, String resource, Map props) { - super(catalogId, name); + super(catalogId, name, InitCatalogLog.Type.TEST); this.type = "test"; this.catalogProperty = new CatalogProperty(resource, props); Class providerClazz = null; try { providerClazz = Class.forName(props.get("catalog_provider.class")); this.catalogProvider = (TestCatalogProvider) providerClazz.newInstance(); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new RuntimeException(e); } } @@ -65,38 +58,7 @@ public class TestExternalCatalog extends ExternalCatalog { protected void initLocalObjectsImpl() { } - @Override - protected void init() { - Map tmpDbNameToId = Maps.newConcurrentMap(); - Map tmpIdToDb = Maps.newConcurrentMap(); - InitCatalogLog initCatalogLog = new InitCatalogLog(); - initCatalogLog.setCatalogId(id); - initCatalogLog.setType(InitCatalogLog.Type.TEST); - List allDatabaseNames = mockedDatabaseNames(); - for (String dbName : allDatabaseNames) { - long dbId; - if (dbNameToId != null && dbNameToId.containsKey(dbName)) { - dbId = dbNameToId.get(dbName); - tmpDbNameToId.put(dbName, dbId); - ExternalDatabase db = idToDb.get(dbId); - db.setUnInitialized(invalidCacheInInit); - tmpIdToDb.put(dbId, db); - initCatalogLog.addRefreshDb(dbId); - } else { - dbId = Env.getCurrentEnv().getNextId(); - tmpDbNameToId.put(dbName, dbId); - TestExternalDatabase db = new TestExternalDatabase(this, dbId, dbName); - tmpIdToDb.put(dbId, db); - initCatalogLog.addCreateDb(dbId, dbName); - } - } - dbNameToId = tmpDbNameToId; - idToDb = tmpIdToDb; - Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); - } - - private List mockedDatabaseNames() { - + protected List listDatabaseNames() { return Lists.newArrayList(catalogProvider.getMetadata().keySet()); } @@ -117,12 +79,6 @@ public class TestExternalCatalog extends ExternalCatalog { return catalogProvider.getMetadata().get(dbName).get(tblName); } - @Override - public List listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return Lists.newArrayList(dbNameToId.keySet()); - } - @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index d48097dea8..60fd334b7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -47,11 +47,14 @@ import org.apache.doris.catalog.external.IcebergExternalDatabase; import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalTable; +import org.apache.doris.catalog.external.MaxComputeExternalDatabase; +import org.apache.doris.catalog.external.MaxComputeExternalTable; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.JdbcExternalCatalog; +import org.apache.doris.datasource.MaxComputeExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog; @@ -183,7 +186,8 @@ public class GsonUtils { .registerSubtype(IcebergHMSExternalCatalog.class, IcebergHMSExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) - .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()); + .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) + .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()); private static RuntimeTypeAdapterFactory dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of( DatabaseIf.class, "clazz") @@ -191,14 +195,16 @@ public class GsonUtils { .registerSubtype(EsExternalDatabase.class, EsExternalDatabase.class.getSimpleName()) .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName()) .registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName()) - .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName()); + .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName()) + .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName()); private static RuntimeTypeAdapterFactory tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of( TableIf.class, "clazz").registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName()) .registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName()) .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName()) .registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName()) - .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName()); + .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName()) + .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName()); // runtime adapter for class "HeartbeatResponse" private static RuntimeTypeAdapterFactory hbResponseTypeAdapterFactory diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 92e2b94148..c4e74c0139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -71,9 +71,11 @@ import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.HudiScanNode; +import org.apache.doris.planner.external.MaxComputeScanNode; import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; +import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TNullSide; import org.apache.doris.thrift.TPushAggOp; @@ -2020,6 +2022,11 @@ public class SingleNodePlanner { case ICEBERG_EXTERNAL_TABLE: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; + case MAX_COMPUTE_EXTERNAL_TABLE: + // TODO: support max compute scan node + scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode", + StatisticalType.MAX_COMPUTE_SCAN_NODE, true); + break; case ES_EXTERNAL_TABLE: scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode", true); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index edd63e2bec..9c5f0f4fbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -325,13 +325,9 @@ public abstract class FileQueryScanNode extends FileScanNode { return rangeDesc; } - protected TFileType getLocationType() throws UserException { - throw new NotImplementedException(""); - } + protected abstract TFileType getLocationType() throws UserException; - protected TFileFormatType getFileFormatType() throws UserException { - throw new NotImplementedException(""); - } + protected abstract TFileFormatType getFileFormatType() throws UserException; protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString()); @@ -341,17 +337,11 @@ public abstract class FileQueryScanNode extends FileScanNode { throw new NotImplementedException(""); } - protected List getPathPartitionKeys() throws UserException { - throw new NotImplementedException(""); - } + protected abstract List getPathPartitionKeys() throws UserException; - protected TableIf getTargetTable() throws UserException { - throw new NotImplementedException(""); - } + protected abstract TableIf getTargetTable() throws UserException; - protected Map getLocationProperties() throws UserException { - throw new NotImplementedException(""); - } + protected abstract Map getLocationProperties() throws UserException; // eg: hdfs://namenode s3://buckets protected String getFsName(FileSplit split) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java new file mode 100644 index 0000000000..367576ba6c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.MaxComputeExternalTable; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MaxComputeScanNode extends FileQueryScanNode { + + private final MaxComputeExternalTable table; + + public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, + StatisticalType statisticalType, boolean needCheckColumnPriv) { + super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); + table = (MaxComputeExternalTable) desc.getTable(); + } + + @Override + protected TFileType getLocationType() throws UserException { + return TFileType.FILE_STREAM; + } + + @Override + public TFileFormatType getFileFormatType() { + // TODO: use max compute format + return TFileFormatType.FORMAT_PARQUET; + } + + @Override + public List getPathPartitionKeys() { + return Collections.emptyList(); + } + + @Override + protected TableIf getTargetTable() throws UserException { + return table; + } + + @Override + protected Map getLocationProperties() throws UserException { + return new HashMap<>(); + } + + @Override + protected List getSplits() throws UserException { + List result = new ArrayList<>(); + result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], Collections.emptyList())); + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index c39be3cf2c..1d1857856d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -48,6 +48,7 @@ public enum StatisticalType { UNION_NODE, TABLE_VALUED_FUNCTION_NODE, FILE_SCAN_NODE, + MAX_COMPUTE_SCAN_NODE, METADATA_SCAN_NODE, JDBC_SCAN_NODE, TEST_EXTERNAL_TABLE, diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index b518bdd219..0c5d04e7a6 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -308,6 +308,12 @@ struct TJdbcTable { 8: optional string jdbc_driver_checksum } +struct TMCTable { + 1: optional string tunnel_url + 2: optional string project + 3: optional string table +} + // "Union" of all table types. struct TTableDescriptor { 1: required Types.TTableId id @@ -330,6 +336,7 @@ struct TTableDescriptor { 18: optional TIcebergTable icebergTable 19: optional THudiTable hudiTable 20: optional TJdbcTable jdbcTable + 21: optional TMCTable mcTable } struct TDescriptorTable { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 0c5c7d6f66..503007a6d3 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -596,6 +596,7 @@ enum TTableType { HUDI_TABLE, JDBC_TABLE, TEST_EXTERNAL_TABLE, + MAX_COMPUTE_TABLE, } enum TKeysType {