From 8abd00dcd5ce367b3871143313aa2d4af0bd2fb3 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Sat, 25 Jun 2022 11:53:04 +0800 Subject: [PATCH] [feature-wip](multi-catalog) Add catalog name to information schema. (#10349) Information schema database need to show catalog name after multi-catalog is supported. This part is step 1, add catalog name for schemata table. --- .../schema_schemata_scanner.cpp | 9 +- .../schema_scanner/schema_tables_scanner.cpp | 13 +- .../org/apache/doris/catalog/TableIf.java | 16 +++ .../doris/catalog/external/ExternalTable.java | 44 ++++++ .../catalog/external/HMSExternalTable.java | 53 +++++++ .../doris/datasource/DataSourceMgr.java | 9 ++ .../doris/service/FrontendServiceImpl.java | 129 ++++++++++-------- gensrc/thrift/FrontendService.thrift | 4 +- 8 files changed, 218 insertions(+), 59 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp index d4a69b37b4..cd794bdeaa 100644 --- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp @@ -72,7 +72,14 @@ Status SchemaSchemataScanner::fill_one_row(Tuple* tuple, MemPool* pool) { memset((void*)tuple, 0, _tuple_desc->num_null_bytes()); // catalog - { tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); } + { + void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + std::string catalog_name = _db_result.catalogs[_db_index]; + str_slot->ptr = (char*)pool->allocate(catalog_name.size()); + str_slot->len = catalog_name.size(); + memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len); + } // schema { void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset()); diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp b/be/src/exec/schema_scanner/schema_tables_scanner.cpp index 580e6cbc41..142200f9ed 100644 --- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp @@ -89,7 +89,14 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple, MemPool* pool) { memset((void*)tuple, 0, _tuple_desc->num_null_bytes()); const TTableStatus& tbl_status = _table_result.tables[_table_index]; // catalog - { tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); } + { + void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + std::string catalog_name = _db_result.catalogs[_db_index - 1]; + str_slot->ptr = (char*)pool->allocate(catalog_name.size()); + str_slot->len = catalog_name.size(); + memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len); + } // schema { void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset()); @@ -245,7 +252,9 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple, MemPool* pool) { Status SchemaTablesScanner::get_new_table() { TGetTablesParams table_params; - table_params.__set_db(_db_result.dbs[_db_index++]); + table_params.__set_db(_db_result.dbs[_db_index]); + table_params.__set_catalog(_db_result.catalogs[_db_index]); + _db_index++; if (nullptr != _param->wild) { table_params.__set_pattern(*(_param->wild)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d69bbe3213..01efa961ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -72,6 +72,22 @@ public interface TableIf { Column getColumn(String name); + String getMysqlType(); + + String getEngine(); + + String getComment(); + + long getCreateTime(); + + long getUpdateTime(); + + long getRowCount(); + + long getDataLength(); + + long getAvgRowLength(); + /** * Doris table type. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 7a96dc970e..ab5a3fa6ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -200,4 +200,48 @@ public class ExternalTable implements TableIf { public Column getColumn(String name) { throw new NotImplementedException(); } + + @Override + public String getMysqlType() { + throw new NotImplementedException(); + } + + @Override + public String getEngine() { + throw new NotImplementedException(); + } + + @Override + public String getComment() { + throw new NotImplementedException(); + } + + @Override + public long getCreateTime() { + throw new NotImplementedException(); + } + + @Override + public long getUpdateTime() { + throw new NotImplementedException(); + } + + @Override + public long getRowCount() { + throw new NotImplementedException(); + } + + @Override + public long getDataLength() { + throw new NotImplementedException(); + } + + @Override + public long getAvgRowLength() { + throw new NotImplementedException(); + } + + public long getLastCheckTime() { + throw new NotImplementedException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index c4167b89df..9c6e9b12fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -132,6 +132,59 @@ public class HMSExternalTable extends ExternalTable { return null; } + @Override + public String getMysqlType() { + return type.name(); + } + + @Override + public String getEngine() { + switch (type) { + case HIVE: + return "Hive"; + case ICEBERG: + return "Iceberg"; + case HUDI: + return "Hudi"; + default: + return null; + } + } + + @Override + public String getComment() { + return ""; + } + + @Override + public long getCreateTime() { + return 0; + } + + @Override + public long getUpdateTime() { + return 0; + } + + @Override + public long getRowCount() { + return 0; + } + + @Override + public long getDataLength() { + return 0; + } + + @Override + public long getAvgRowLength() { + return 0; + } + + public long getLastCheckTime() { + return 0; + } + /** * get database name of hms table. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java index d77cc9a18d..8f04e035b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * DataSourceMgr will load all data sources at FE startup, @@ -78,6 +79,10 @@ public class DataSourceMgr implements Writable { return internalDataSource; } + public DataSourceIf getCatalog(String name) { + return nameToCatalogs.get(name); + } + private void writeLock() { lock.writeLock().lock(); } @@ -241,6 +246,10 @@ public class DataSourceMgr implements Writable { } } + public List listCatalogs() { + return nameToCatalogs.values().stream().collect(Collectors.toList()); + } + /** * Reply for alter catalog props event. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 67aa2baff0..b8419215a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -22,9 +22,12 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; @@ -38,6 +41,7 @@ import org.apache.doris.common.ThriftServerContext; import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; +import org.apache.doris.datasource.DataSourceIf; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.LoadJob; import org.apache.doris.load.MiniEtlTaskInfo; @@ -150,6 +154,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { TGetDbsResult result = new TGetDbsResult(); List dbs = Lists.newArrayList(); + List catalogs = Lists.newArrayList(); PatternMatcher matcher = null; if (params.isSetPattern()) { try { @@ -161,28 +166,34 @@ public class FrontendServiceImpl implements FrontendService.Iface { } Catalog catalog = Catalog.getCurrentCatalog(); - List dbNames = catalog.getDbNames(); - LOG.debug("get db names: {}", dbNames); + List dataSourceIfs = catalog.getDataSourceMgr().listCatalogs(); + for (DataSourceIf ds : dataSourceIfs) { + List dbNames = ds.getDbNames(); + LOG.debug("get db names: {}, in data source: {}", dbNames, ds.getName()); - UserIdentity currentUser = null; - if (params.isSetCurrentUserIdent()) { - currentUser = UserIdentity.fromThrift(params.current_user_ident); - } else { - currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip); - } - for (String fullName : dbNames) { - if (!catalog.getAuth().checkDbPriv(currentUser, fullName, PrivPredicate.SHOW)) { - continue; + UserIdentity currentUser = null; + if (params.isSetCurrentUserIdent()) { + currentUser = UserIdentity.fromThrift(params.current_user_ident); + } else { + currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip); } + for (String fullName : dbNames) { + if (!catalog.getAuth().checkDbPriv(currentUser, fullName, PrivPredicate.SHOW)) { + continue; + } - final String db = ClusterNamespace.getNameFromFullName(fullName); - if (matcher != null && !matcher.match(db)) { - continue; + final String db = ClusterNamespace.getNameFromFullName(fullName); + if (matcher != null && !matcher.match(db)) { + continue; + } + + catalogs.add(ds.getName()); + dbs.add(fullName); } - - dbs.add(fullName); } + result.setDbs(dbs); + result.setCatalogs(catalogs); return result; } @@ -243,7 +254,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new TException("Pattern is in bad format " + params.getPattern()); } } - // database privs should be checked in analysis phrase UserIdentity currentUser; @@ -252,51 +262,60 @@ public class FrontendServiceImpl implements FrontendService.Iface { } else { currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip); } - Database db = Catalog.getCurrentCatalog().getDbNullable(params.db); - if (db != null) { - List tables = null; - if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) { - tables = db.getTables(); - } else { - switch (params.getType()) { - case "VIEW": - tables = db.getViews(); - break; - default: - tables = db.getTables(); + DataSourceIf ds = Catalog.getCurrentCatalog().getDataSourceMgr().getCatalog(params.catalog); + if (ds != null) { + DatabaseIf db = ds.getDbNullable(params.db); + if (db != null) { + List tables = null; + if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) { + tables = db.getTables(); + } else { + switch (params.getType()) { + case "VIEW": + tables = db.getViews(); + break; + default: + tables = db.getTables(); + } } - } - for (Table table : tables) { - if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db, - table.getName(), PrivPredicate.SHOW)) { - continue; - } - table.readLock(); - try { + for (TableIf table : tables) { if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db, table.getName(), PrivPredicate.SHOW)) { continue; } + table.readLock(); + try { + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db, + table.getName(), PrivPredicate.SHOW)) { + continue; + } - if (matcher != null && !matcher.match(table.getName())) { - continue; + if (matcher != null && !matcher.match(table.getName())) { + continue; + } + long lastCheckTime = 0; + if (table instanceof Table) { + lastCheckTime = ((Table) table).getLastCheckTime(); + } else { + lastCheckTime = ((ExternalTable) table).getLastCheckTime(); + } + TTableStatus status = new TTableStatus(); + status.setName(table.getName()); + status.setType(table.getMysqlType()); + status.setEngine(table.getEngine()); + status.setComment(table.getComment()); + status.setCreateTime(table.getCreateTime()); + status.setLastCheckTime(lastCheckTime); + status.setUpdateTime(table.getUpdateTime() / 1000); + status.setCheckTime(lastCheckTime); + status.setCollation("utf-8"); + status.setRows(table.getRowCount()); + status.setDataLength(table.getDataLength()); + status.setAvgRowLength(table.getAvgRowLength()); + tablesResult.add(status); + } finally { + table.readUnlock(); } - TTableStatus status = new TTableStatus(); - status.setName(table.getName()); - status.setType(table.getMysqlType()); - status.setEngine(table.getEngine()); - status.setComment(table.getComment()); - status.setCreateTime(table.getCreateTime()); - status.setLastCheckTime(table.getLastCheckTime()); - status.setUpdateTime(table.getUpdateTime() / 1000); - status.setCheckTime(table.getLastCheckTime()); - status.setCollation("utf-8"); - status.setRows(table.getRowCount()); - status.setDataLength(table.getDataLength()); - status.setAvgRowLength(table.getAvgRowLength()); - tablesResult.add(status); - } finally { - table.readUnlock(); } } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f195a11722..ae3018d8a4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -287,9 +287,10 @@ struct TGetDbsParams { 4: optional Types.TUserIdentity current_user_ident // to replace the user and user ip } -// getDbNames returns a list of database names +// getDbNames returns a list of database names and catalog names struct TGetDbsResult { 1: list dbs + 2: list catalogs } // Arguments to getTableNames, which returns a list of tables that match an @@ -304,6 +305,7 @@ struct TGetTablesParams { 4: optional string user_ip // deprecated 5: optional Types.TUserIdentity current_user_ident // to replace the user and user ip 6: optional string type + 7: optional string catalog } struct TTableStatus {