[feature](information_schema)add metadata_name_ids for quickly get catlogs,db,table and add profiling table in order to Compatible with mysql (#22702)

add information_schema.metadata_name_idsfor quickly get catlogs,db,table.

1. table  struct :   
```mysql
mysql> desc  internal.information_schema.metadata_name_ids;
+---------------+--------------+------+-------+---------+-------+
| Field         | Type         | Null | Key   | Default | Extra |
+---------------+--------------+------+-------+---------+-------+
| CATALOG_ID    | BIGINT       | Yes  | false | NULL    |       |
| CATALOG_NAME  | VARCHAR(512) | Yes  | false | NULL    |       |
| DATABASE_ID   | BIGINT       | Yes  | false | NULL    |       |
| DATABASE_NAME | VARCHAR(64)  | Yes  | false | NULL    |       |
| TABLE_ID      | BIGINT       | Yes  | false | NULL    |       |
| TABLE_NAME    | VARCHAR(64)  | Yes  | false | NULL    |       |
+---------------+--------------+------+-------+---------+-------+
6 rows in set (0.00 sec) 


mysql> select * from internal.information_schema.metadata_name_ids where CATALOG_NAME="hive1" limit 1 \G;
*************************** 1. row ***************************
   CATALOG_ID: 113008
 CATALOG_NAME: hive1
  DATABASE_ID: 113042
DATABASE_NAME: ssb1_parquet
     TABLE_ID: 114009
   TABLE_NAME: dates
1 row in set (0.07 sec)
```

2. when you create / drop catalog , need not refresh catalog . 
```mysql
mysql> select count(*) from internal.information_schema.metadata_name_ids\G; 
*************************** 1. row ***************************
count(*): 21301
1 row in set (0.34 sec)


mysql> drop catalog hive2;
Query OK, 0 rows affected (0.01 sec)

mysql> select count(*) from internal.information_schema.metadata_name_ids\G; 
*************************** 1. row ***************************
count(*): 10665
1 row in set (0.04 sec) 


mysql> create catalog hive3 ... 
mysql> select count(*) from internal.information_schema.metadata_name_ids\G;                                                                        
*************************** 1. row ***************************
count(*): 21301
1 row in set (0.32 sec)
```

3. create / drop table , need not refresh catalog .  
```mysql
mysql> CREATE TABLE IF NOT EXISTS demo.example_tbl ... ;


mysql> select count(*) from internal.information_schema.metadata_name_ids\G; 
*************************** 1. row ***************************
count(*): 10666
1 row in set (0.04 sec)

mysql> drop table demo.example_tbl;
Query OK, 0 rows affected (0.01 sec)

mysql> select count(*) from internal.information_schema.metadata_name_ids\G; 
*************************** 1. row ***************************
count(*): 10665
1 row in set (0.04 sec) 

```

4. you can set query time , prevent queries from taking too long . 
```

fe.conf :  query_metadata_name_ids_timeout 

the time used to obtain all tables in one database

```
5. add information_schema.profiling in order to Compatible with  mysql

```mysql
mysql> select * from information_schema.profiling;
Empty set (0.07 sec)

mysql> set profiling=1;                                                                                 
Query OK, 0 rows affected (0.01 sec)
```
This commit is contained in:
daidai
2023-08-31 21:22:26 +08:00
committed by GitHub
parent 6fe2418cfc
commit e680d42fe7
19 changed files with 743 additions and 52 deletions

View File

@ -68,8 +68,9 @@ public enum SchemaTableType {
SCH_CREATE_TABLE("CREATE_TABLE", "CREATE_TABLE", TSchemaTableType.SCH_CREATE_TABLE),
SCH_INVALID("NULL", "NULL", TSchemaTableType.SCH_INVALID),
SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS),
SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS);
SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS),
SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", TSchemaTableType.SCH_METADATA_NAME_IDS),
SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;

View File

@ -250,10 +250,10 @@ public class SchemaTable extends Table {
// Compatible with mysql for mysqldump
.put("column_statistics",
new SchemaTable(SystemIdGenerator.getNextId(), "column_statistics", TableType.SCHEMA,
builder().column("SCHEMA_NAME", ScalarType.createVarchar(64))
.column("TABLE_NAME", ScalarType.createVarchar(64))
.column("COLUMN_NAME", ScalarType.createVarchar(64))
.column("HISTOGRAM", ScalarType.createJsonbType()).build()))
builder().column("SCHEMA_NAME", ScalarType.createVarchar(64))
.column("TABLE_NAME", ScalarType.createVarchar(64))
.column("COLUMN_NAME", ScalarType.createVarchar(64))
.column("HISTOGRAM", ScalarType.createJsonbType()).build()))
.put("files",
new SchemaTable(SystemIdGenerator.getNextId(), "files", TableType.SCHEMA,
builder().column("FILE_ID", ScalarType.createType(PrimitiveType.BIGINT))
@ -384,37 +384,66 @@ public class SchemaTable extends Table {
.column("COLLATION_CONNECTION", ScalarType.createVarchar(32))
.column("DATABASE_COLLATION", ScalarType.createVarchar(32)).build()))
.put("rowsets", new SchemaTable(SystemIdGenerator.getNextId(), "rowsets", TableType.SCHEMA,
builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("ROWSET_ID", ScalarType.createVarchar(64))
.column("TABLET_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("ROWSET_NUM_ROWS", ScalarType.createType(PrimitiveType.BIGINT))
.column("TXN_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("NUM_SEGMENTS", ScalarType.createType(PrimitiveType.BIGINT))
.column("START_VERSION", ScalarType.createType(PrimitiveType.BIGINT))
.column("END_VERSION", ScalarType.createType(PrimitiveType.BIGINT))
.column("INDEX_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATA_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATION_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("NEWEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT))
.build()))
builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("ROWSET_ID", ScalarType.createVarchar(64))
.column("TABLET_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("ROWSET_NUM_ROWS", ScalarType.createType(PrimitiveType.BIGINT))
.column("TXN_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("NUM_SEGMENTS", ScalarType.createType(PrimitiveType.BIGINT))
.column("START_VERSION", ScalarType.createType(PrimitiveType.BIGINT))
.column("END_VERSION", ScalarType.createType(PrimitiveType.BIGINT))
.column("INDEX_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATA_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATION_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("NEWEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT))
.build()))
.put("parameters", new SchemaTable(SystemIdGenerator.getNextId(), "parameters", TableType.SCHEMA,
builder().column("SPECIFIC_CATALOG", ScalarType.createVarchar(64))
.column("SPECIFIC_SCHEMA", ScalarType.createVarchar(64))
.column("SPECIFIC_NAME", ScalarType.createVarchar(64))
.column("ORDINAL_POSITION", ScalarType.createVarchar(77))
.column("PARAMETER_MODE", ScalarType.createVarchar(77))
.column("PARAMETER_NAME", ScalarType.createVarchar(77))
.column("DATA_TYPE", ScalarType.createVarchar(64))
.column("CHARACTER_OCTET_LENGTH", ScalarType.createVarchar(64))
.column("NUMERIC_PRECISION", ScalarType.createVarchar(512))
.column("NUMERIC_SCALE", ScalarType.createVarchar(64))
.column("DATETIME_PRECISION", ScalarType.createVarchar(64))
.column("CHARACTER_SET_NAME", ScalarType.createVarchar(256))
.column("COLLATION_NAME", ScalarType.createVarchar(64))
.column("DTD_IDENTIFIER", ScalarType.createVarchar(64))
.column("ROUTINE_TYPE", ScalarType.createVarchar(64))
.column("DATA_TYPEDTD_IDENDS", ScalarType.createVarchar(64))
.build()))
builder().column("SPECIFIC_CATALOG", ScalarType.createVarchar(64))
.column("SPECIFIC_SCHEMA", ScalarType.createVarchar(64))
.column("SPECIFIC_NAME", ScalarType.createVarchar(64))
.column("ORDINAL_POSITION", ScalarType.createVarchar(77))
.column("PARAMETER_MODE", ScalarType.createVarchar(77))
.column("PARAMETER_NAME", ScalarType.createVarchar(77))
.column("DATA_TYPE", ScalarType.createVarchar(64))
.column("CHARACTER_OCTET_LENGTH", ScalarType.createVarchar(64))
.column("NUMERIC_PRECISION", ScalarType.createVarchar(512))
.column("NUMERIC_SCALE", ScalarType.createVarchar(64))
.column("DATETIME_PRECISION", ScalarType.createVarchar(64))
.column("CHARACTER_SET_NAME", ScalarType.createVarchar(256))
.column("COLLATION_NAME", ScalarType.createVarchar(64))
.column("DTD_IDENTIFIER", ScalarType.createVarchar(64))
.column("ROUTINE_TYPE", ScalarType.createVarchar(64))
.column("DATA_TYPEDTD_IDENDS", ScalarType.createVarchar(64))
.build()))
.put("metadata_name_ids", new SchemaTable(SystemIdGenerator.getNextId(),
"metadata_name_ids", TableType.SCHEMA,
builder().column("CATALOG_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CATALOG_NAME", ScalarType.createVarchar(FN_REFLEN))
.column("DATABASE_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATABASE_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("TABLE_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("TABLE_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.build()))
.put("profiling", new SchemaTable(SystemIdGenerator.getNextId(), "profiling", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createType(PrimitiveType.INT))
.column("SEQ", ScalarType.createType(PrimitiveType.INT))
.column("STATE", ScalarType.createVarchar(30))
.column("DURATION", ScalarType.createType(PrimitiveType.DOUBLE))
.column("CPU_USER", ScalarType.createType(PrimitiveType.DOUBLE))
.column("CPU_SYSTEM", ScalarType.createType(PrimitiveType.DOUBLE))
.column("CONTEXT_VOLUNTARY", ScalarType.createType(PrimitiveType.INT))
.column("CONTEXT_INVOLUNTARY", ScalarType.createType(PrimitiveType.INT))
.column("BLOCK_OPS_IN", ScalarType.createType(PrimitiveType.INT))
.column("BLOCK_OPS_OUT", ScalarType.createType(PrimitiveType.INT))
.column("MESSAGES_SENT", ScalarType.createType(PrimitiveType.INT))
.column("MESSAGES_RECEIVED", ScalarType.createType(PrimitiveType.INT))
.column("PAGE_FAULTS_MAJOR", ScalarType.createType(PrimitiveType.INT))
.column("PAGE_FAULTS_MINOR", ScalarType.createType(PrimitiveType.INT))
.column("SWAPS", ScalarType.createType(PrimitiveType.INT))
.column("SOURCE_FUNCTION", ScalarType.createVarchar(30))
.column("SOURCE_FILE", ScalarType.createVarchar(20))
.column("SOURCE_LINE", ScalarType.createType(PrimitiveType.INT))
.build()))
.build();
protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) {

View File

@ -587,6 +587,7 @@ public abstract class ExternalCatalog
@Override
public Collection<DatabaseIf> getAllDbs() {
makeSureInitialized();
return new HashSet<>(idToDb.values());
}
}

View File

@ -66,6 +66,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
@ -133,6 +134,9 @@ public class SessionVariable implements Serializable, Writable {
public static final String DEFAULT_STORAGE_ENGINE = "default_storage_engine";
public static final String DEFAULT_TMP_STORAGE_ENGINE = "default_tmp_storage_engine";
// Compatible with mysql
public static final String PROFILLING = "profiling";
public static final String DIV_PRECISION_INCREMENT = "div_precision_increment";
// see comment of `doris_max_scan_key_num` and `max_pushdown_conditions_per_column` in BE config
@ -726,6 +730,9 @@ public class SessionVariable implements Serializable, Writable {
return beNumberForTest;
}
@VariableMgr.VarAttr(name = PROFILLING)
public boolean profiling = false;
public void setBeNumberForTest(int beNumberForTest) {
this.beNumberForTest = beNumberForTest;
}
@ -1318,6 +1325,7 @@ public class SessionVariable implements Serializable, Writable {
return insertTimeoutS;
}
public void setInsertTimeoutS(int insertTimeoutS) {
this.insertTimeoutS = insertTimeoutS;
}

View File

@ -140,6 +140,7 @@ import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
import org.apache.doris.thrift.TLoadTxn2PCResult;
@ -180,6 +181,7 @@ import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TTxnParams;
@ -208,13 +210,19 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntSupplier;
@ -324,8 +332,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
LOG.debug("get db request: {}", params);
TGetDbsResult result = new TGetDbsResult();
List<String> dbs = Lists.newArrayList();
List<String> catalogs = Lists.newArrayList();
List<String> dbNames = Lists.newArrayList();
List<String> catalogNames = Lists.newArrayList();
List<Long> dbIds = Lists.newArrayList();
List<Long> catalogIds = Lists.newArrayList();
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
@ -345,40 +356,51 @@ public class FrontendServiceImpl implements FrontendService.Iface {
.getCatalogOrException(params.catalog, catalog -> new TException("Unknown catalog " + catalog)));
}
for (CatalogIf catalog : catalogIfs) {
List<String> dbNames;
Collection<DatabaseIf> dbs = new HashSet<DatabaseIf>();
try {
dbNames = catalog.getDbNamesOrEmpty();
dbs = catalog.getAllDbs();
} catch (Exception e) {
LOG.warn("failed to get database names for catalog {}", catalog.getName(), e);
// Some external catalog may fail to get databases due to wrong connection info.
// So continue here to get databases of other catalogs.
}
LOG.debug("get db size: {}, in catalog: {}", dbs.size(), catalog.getName());
if (dbs.isEmpty() && params.isSetGetNullCatalog() && params.get_null_catalog) {
catalogNames.add(catalog.getName());
dbNames.add("NULL");
catalogIds.add(catalog.getId());
dbIds.add(-1L);
continue;
}
if (dbs.isEmpty()) {
continue;
}
LOG.debug("get db names: {}, in catalog: {}", dbNames, catalog.getName());
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
for (String fullName : dbNames) {
for (DatabaseIf db : dbs) {
String fullName = db.getFullName();
if (!env.getAccessManager().checkDbPriv(currentUser, fullName, PrivPredicate.SHOW)) {
continue;
}
final String db = ClusterNamespace.getNameFromFullName(fullName);
if (matcher != null && !matcher.match(db)) {
if (matcher != null && !matcher.match(ClusterNamespace.getNameFromFullName(fullName))) {
continue;
}
catalogs.add(catalog.getName());
dbs.add(fullName);
catalogNames.add(catalog.getName());
dbNames.add(fullName);
catalogIds.add(catalog.getId());
dbIds.add(db.getId());
}
}
result.setDbs(dbs);
result.setCatalogs(catalogs);
result.setDbs(dbNames);
result.setCatalogs(catalogNames);
result.setCatalogIds(catalogIds);
result.setDbIds(dbIds);
return result;
}
@ -679,6 +701,87 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
public TListTableMetadataNameIdsResult listTableMetadataNameIds(TGetTablesParams params) throws TException {
LOG.debug("get list simple table request: {}", params);
TListTableMetadataNameIdsResult result = new TListTableMetadataNameIdsResult();
List<TTableMetadataNameIds> tablesResult = Lists.newArrayList();
result.setTables(tablesResult);
UserIdentity currentUser;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
String catalogName;
if (params.isSetCatalog()) {
catalogName = params.catalog;
} else {
catalogName = InternalCatalog.INTERNAL_CATALOG_NAME;
}
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
CaseSensibility.TABLE.getCaseSensibility());
} catch (PatternMatcherException e) {
throw new TException("Pattern is in bad format " + params.getPattern());
}
}
PatternMatcher finalMatcher = matcher;
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalog != null) {
DatabaseIf db = catalog.getDbNullable(params.db);
if (db != null) {
List<TableIf> tables = db.getTables();
for (TableIf table : tables) {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUser, params.db,
table.getName(), PrivPredicate.SHOW)) {
continue;
}
table.readLock();
try {
if (finalMatcher != null && !finalMatcher.match(table.getName())) {
continue;
}
TTableMetadataNameIds status = new TTableMetadataNameIds();
status.setName(table.getName());
status.setId(table.getId());
tablesResult.add(status);
} finally {
table.readUnlock();
}
}
}
}
});
try {
if (catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
future.get();
} else {
future.get(Config.query_metadata_name_ids_timeout, TimeUnit.SECONDS);
}
} catch (TimeoutException e) {
future.cancel(true);
LOG.info("From catalog:{},db:{} get tables timeout.", catalogName, params.db);
} catch (InterruptedException | ExecutionException e) {
future.cancel(true);
} finally {
executor.shutdown();
}
return result;
}
@Override
public TListPrivilegesResult listTablePrivilegeStatus(TGetTablesParams params) throws TException {
LOG.debug("get list table privileges request: {}", params);