[feature-wip](multi-catalog) Add catalog name to information schema. (#10349)
Information schema database need to show catalog name after multi-catalog is supported. This part is step 1, add catalog name for schemata table.
This commit is contained in:
@ -72,7 +72,14 @@ Status SchemaSchemataScanner::fill_one_row(Tuple* tuple, MemPool* pool) {
|
||||
memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
|
||||
|
||||
// catalog
|
||||
{ tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); }
|
||||
{
|
||||
void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset());
|
||||
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
|
||||
std::string catalog_name = _db_result.catalogs[_db_index];
|
||||
str_slot->ptr = (char*)pool->allocate(catalog_name.size());
|
||||
str_slot->len = catalog_name.size();
|
||||
memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len);
|
||||
}
|
||||
// schema
|
||||
{
|
||||
void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset());
|
||||
|
||||
@ -89,7 +89,14 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple, MemPool* pool) {
|
||||
memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
|
||||
const TTableStatus& tbl_status = _table_result.tables[_table_index];
|
||||
// catalog
|
||||
{ tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); }
|
||||
{
|
||||
void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset());
|
||||
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
|
||||
std::string catalog_name = _db_result.catalogs[_db_index - 1];
|
||||
str_slot->ptr = (char*)pool->allocate(catalog_name.size());
|
||||
str_slot->len = catalog_name.size();
|
||||
memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len);
|
||||
}
|
||||
// schema
|
||||
{
|
||||
void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset());
|
||||
@ -245,7 +252,9 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple, MemPool* pool) {
|
||||
|
||||
Status SchemaTablesScanner::get_new_table() {
|
||||
TGetTablesParams table_params;
|
||||
table_params.__set_db(_db_result.dbs[_db_index++]);
|
||||
table_params.__set_db(_db_result.dbs[_db_index]);
|
||||
table_params.__set_catalog(_db_result.catalogs[_db_index]);
|
||||
_db_index++;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
}
|
||||
|
||||
@ -72,6 +72,22 @@ public interface TableIf {
|
||||
|
||||
Column getColumn(String name);
|
||||
|
||||
String getMysqlType();
|
||||
|
||||
String getEngine();
|
||||
|
||||
String getComment();
|
||||
|
||||
long getCreateTime();
|
||||
|
||||
long getUpdateTime();
|
||||
|
||||
long getRowCount();
|
||||
|
||||
long getDataLength();
|
||||
|
||||
long getAvgRowLength();
|
||||
|
||||
/**
|
||||
* Doris table type.
|
||||
*/
|
||||
|
||||
@ -200,4 +200,48 @@ public class ExternalTable implements TableIf {
|
||||
public Column getColumn(String name) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMysqlType() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEngine() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComment() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreateTime() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUpdateTime() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowCount() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataLength() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvgRowLength() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public long getLastCheckTime() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,6 +132,59 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMysqlType() {
|
||||
return type.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEngine() {
|
||||
switch (type) {
|
||||
case HIVE:
|
||||
return "Hive";
|
||||
case ICEBERG:
|
||||
return "Iceberg";
|
||||
case HUDI:
|
||||
return "Hudi";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComment() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreateTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUpdateTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvgRowLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public long getLastCheckTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* get database name of hms table.
|
||||
*/
|
||||
|
||||
@ -47,6 +47,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* DataSourceMgr will load all data sources at FE startup,
|
||||
@ -78,6 +79,10 @@ public class DataSourceMgr implements Writable {
|
||||
return internalDataSource;
|
||||
}
|
||||
|
||||
public DataSourceIf getCatalog(String name) {
|
||||
return nameToCatalogs.get(name);
|
||||
}
|
||||
|
||||
private void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
@ -241,6 +246,10 @@ public class DataSourceMgr implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public List<DataSourceIf> listCatalogs() {
|
||||
return nameToCatalogs.values().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply for alter catalog props event.
|
||||
*/
|
||||
|
||||
@ -22,9 +22,12 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthenticationException;
|
||||
@ -38,6 +41,7 @@ import org.apache.doris.common.ThriftServerContext;
|
||||
import org.apache.doris.common.ThriftServerEventProcessor;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.Version;
|
||||
import org.apache.doris.datasource.DataSourceIf;
|
||||
import org.apache.doris.load.EtlStatus;
|
||||
import org.apache.doris.load.LoadJob;
|
||||
import org.apache.doris.load.MiniEtlTaskInfo;
|
||||
@ -150,6 +154,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
TGetDbsResult result = new TGetDbsResult();
|
||||
|
||||
List<String> dbs = Lists.newArrayList();
|
||||
List<String> catalogs = Lists.newArrayList();
|
||||
PatternMatcher matcher = null;
|
||||
if (params.isSetPattern()) {
|
||||
try {
|
||||
@ -161,28 +166,34 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
}
|
||||
|
||||
Catalog catalog = Catalog.getCurrentCatalog();
|
||||
List<String> dbNames = catalog.getDbNames();
|
||||
LOG.debug("get db names: {}", dbNames);
|
||||
List<DataSourceIf> dataSourceIfs = catalog.getDataSourceMgr().listCatalogs();
|
||||
for (DataSourceIf ds : dataSourceIfs) {
|
||||
List<String> dbNames = ds.getDbNames();
|
||||
LOG.debug("get db names: {}, in data source: {}", dbNames, ds.getName());
|
||||
|
||||
UserIdentity currentUser = null;
|
||||
if (params.isSetCurrentUserIdent()) {
|
||||
currentUser = UserIdentity.fromThrift(params.current_user_ident);
|
||||
} else {
|
||||
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
|
||||
}
|
||||
for (String fullName : dbNames) {
|
||||
if (!catalog.getAuth().checkDbPriv(currentUser, fullName, PrivPredicate.SHOW)) {
|
||||
continue;
|
||||
UserIdentity currentUser = null;
|
||||
if (params.isSetCurrentUserIdent()) {
|
||||
currentUser = UserIdentity.fromThrift(params.current_user_ident);
|
||||
} else {
|
||||
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
|
||||
}
|
||||
for (String fullName : dbNames) {
|
||||
if (!catalog.getAuth().checkDbPriv(currentUser, fullName, PrivPredicate.SHOW)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final String db = ClusterNamespace.getNameFromFullName(fullName);
|
||||
if (matcher != null && !matcher.match(db)) {
|
||||
continue;
|
||||
final String db = ClusterNamespace.getNameFromFullName(fullName);
|
||||
if (matcher != null && !matcher.match(db)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
catalogs.add(ds.getName());
|
||||
dbs.add(fullName);
|
||||
}
|
||||
|
||||
dbs.add(fullName);
|
||||
}
|
||||
|
||||
result.setDbs(dbs);
|
||||
result.setCatalogs(catalogs);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -243,7 +254,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
throw new TException("Pattern is in bad format " + params.getPattern());
|
||||
}
|
||||
}
|
||||
|
||||
// database privs should be checked in analysis phrase
|
||||
|
||||
UserIdentity currentUser;
|
||||
@ -252,51 +262,60 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
} else {
|
||||
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
|
||||
}
|
||||
Database db = Catalog.getCurrentCatalog().getDbNullable(params.db);
|
||||
if (db != null) {
|
||||
List<Table> tables = null;
|
||||
if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) {
|
||||
tables = db.getTables();
|
||||
} else {
|
||||
switch (params.getType()) {
|
||||
case "VIEW":
|
||||
tables = db.getViews();
|
||||
break;
|
||||
default:
|
||||
tables = db.getTables();
|
||||
DataSourceIf ds = Catalog.getCurrentCatalog().getDataSourceMgr().getCatalog(params.catalog);
|
||||
if (ds != null) {
|
||||
DatabaseIf db = ds.getDbNullable(params.db);
|
||||
if (db != null) {
|
||||
List<TableIf> tables = null;
|
||||
if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) {
|
||||
tables = db.getTables();
|
||||
} else {
|
||||
switch (params.getType()) {
|
||||
case "VIEW":
|
||||
tables = db.getViews();
|
||||
break;
|
||||
default:
|
||||
tables = db.getTables();
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Table table : tables) {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
|
||||
table.getName(), PrivPredicate.SHOW)) {
|
||||
continue;
|
||||
}
|
||||
table.readLock();
|
||||
try {
|
||||
for (TableIf table : tables) {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
|
||||
table.getName(), PrivPredicate.SHOW)) {
|
||||
continue;
|
||||
}
|
||||
table.readLock();
|
||||
try {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
|
||||
table.getName(), PrivPredicate.SHOW)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (matcher != null && !matcher.match(table.getName())) {
|
||||
continue;
|
||||
if (matcher != null && !matcher.match(table.getName())) {
|
||||
continue;
|
||||
}
|
||||
long lastCheckTime = 0;
|
||||
if (table instanceof Table) {
|
||||
lastCheckTime = ((Table) table).getLastCheckTime();
|
||||
} else {
|
||||
lastCheckTime = ((ExternalTable) table).getLastCheckTime();
|
||||
}
|
||||
TTableStatus status = new TTableStatus();
|
||||
status.setName(table.getName());
|
||||
status.setType(table.getMysqlType());
|
||||
status.setEngine(table.getEngine());
|
||||
status.setComment(table.getComment());
|
||||
status.setCreateTime(table.getCreateTime());
|
||||
status.setLastCheckTime(lastCheckTime);
|
||||
status.setUpdateTime(table.getUpdateTime() / 1000);
|
||||
status.setCheckTime(lastCheckTime);
|
||||
status.setCollation("utf-8");
|
||||
status.setRows(table.getRowCount());
|
||||
status.setDataLength(table.getDataLength());
|
||||
status.setAvgRowLength(table.getAvgRowLength());
|
||||
tablesResult.add(status);
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
TTableStatus status = new TTableStatus();
|
||||
status.setName(table.getName());
|
||||
status.setType(table.getMysqlType());
|
||||
status.setEngine(table.getEngine());
|
||||
status.setComment(table.getComment());
|
||||
status.setCreateTime(table.getCreateTime());
|
||||
status.setLastCheckTime(table.getLastCheckTime());
|
||||
status.setUpdateTime(table.getUpdateTime() / 1000);
|
||||
status.setCheckTime(table.getLastCheckTime());
|
||||
status.setCollation("utf-8");
|
||||
status.setRows(table.getRowCount());
|
||||
status.setDataLength(table.getDataLength());
|
||||
status.setAvgRowLength(table.getAvgRowLength());
|
||||
tablesResult.add(status);
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -287,9 +287,10 @@ struct TGetDbsParams {
|
||||
4: optional Types.TUserIdentity current_user_ident // to replace the user and user ip
|
||||
}
|
||||
|
||||
// getDbNames returns a list of database names
|
||||
// getDbNames returns a list of database names and catalog names
|
||||
struct TGetDbsResult {
|
||||
1: list<string> dbs
|
||||
2: list<string> catalogs
|
||||
}
|
||||
|
||||
// Arguments to getTableNames, which returns a list of tables that match an
|
||||
@ -304,6 +305,7 @@ struct TGetTablesParams {
|
||||
4: optional string user_ip // deprecated
|
||||
5: optional Types.TUserIdentity current_user_ident // to replace the user and user ip
|
||||
6: optional string type
|
||||
7: optional string catalog
|
||||
}
|
||||
|
||||
struct TTableStatus {
|
||||
|
||||
Reference in New Issue
Block a user