[feature](multi-catalog) add specified_database_list PROPERTY for jdbc/hms/iceberg catalog (#17803)
add specified_database_list PROPERTY for jdbc catalog, user can use many database specified by jdbc catalog
This commit is contained in:
@ -94,11 +94,13 @@ public class JdbcResource extends Resource {
|
||||
DRIVER_URL,
|
||||
TYPE,
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST
|
||||
).build();
|
||||
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST
|
||||
).build();
|
||||
|
||||
// The default value of optional properties
|
||||
@ -108,6 +110,7 @@ public class JdbcResource extends Resource {
|
||||
static {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(SPECIFIED_DATABASE_LIST, "");
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
|
||||
@ -46,6 +46,7 @@ import java.util.stream.Collectors;
|
||||
public abstract class Resource implements Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class);
|
||||
public static final String REFERENCE_SPLIT = "@";
|
||||
public static final String SPECIFIED_DATABASE_LIST = "specified_database_list";
|
||||
|
||||
public enum ResourceType {
|
||||
UNKNOWN,
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.external.EsExternalDatabase;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
@ -417,4 +418,21 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
public void createDatabase(long dbId, String dbName) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public Map getSpecifiedDatabaseMap() {
|
||||
String specifiedDatabaseList = catalogProperty.getOrDefault(Resource.SPECIFIED_DATABASE_LIST, "");
|
||||
Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
|
||||
specifiedDatabaseList = specifiedDatabaseList.trim();
|
||||
if (specifiedDatabaseList.isEmpty()) {
|
||||
return specifiedDatabaseMap;
|
||||
}
|
||||
String[] databaseList = specifiedDatabaseList.split(",");
|
||||
for (int i = 0; i < databaseList.length; i++) {
|
||||
String dbname = databaseList[i].trim();
|
||||
if (!dbname.isEmpty()) {
|
||||
specifiedDatabaseMap.put(dbname, true);
|
||||
}
|
||||
}
|
||||
return specifiedDatabaseMap;
|
||||
}
|
||||
}
|
||||
|
||||
@ -128,8 +128,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
initCatalogLog.setCatalogId(id);
|
||||
initCatalogLog.setType(InitCatalogLog.Type.HMS);
|
||||
List<String> allDatabases = client.getAllDatabases();
|
||||
Map<String, Boolean> specifiedDatabaseMap = getSpecifiedDatabaseMap();
|
||||
// Update the db name to id map.
|
||||
for (String dbName : allDatabases) {
|
||||
if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) {
|
||||
continue;
|
||||
}
|
||||
long dbId;
|
||||
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
|
||||
dbId = dbNameToId.get(dbName);
|
||||
|
||||
@ -128,10 +128,14 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
|
||||
}
|
||||
|
||||
public String getSpecifiedDatabaseList() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.SPECIFIED_DATABASE_LIST, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass(),
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames());
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames(), getSpecifiedDatabaseMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -67,7 +67,11 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
initCatalogLog.setCatalogId(id);
|
||||
initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
|
||||
List<String> allDatabaseNames = listDatabaseNames();
|
||||
Map<String, Boolean> specifiedDatabaseMap = getSpecifiedDatabaseMap();
|
||||
for (String dbName : allDatabaseNames) {
|
||||
if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) {
|
||||
continue;
|
||||
}
|
||||
long dbId;
|
||||
if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
|
||||
dbId = dbNameToId.get(dbName);
|
||||
|
||||
@ -62,14 +62,19 @@ public class JdbcClient {
|
||||
|
||||
private boolean isLowerCaseTableNames = false;
|
||||
|
||||
private Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
|
||||
|
||||
// only used when isLowerCaseTableNames = true.
|
||||
private Map<String, String> lowerTableToRealTable = Maps.newHashMap();
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass,
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames) {
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames, Map specifiedDatabaseMap) {
|
||||
this.jdbcUser = user;
|
||||
this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue();
|
||||
this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue();
|
||||
if (specifiedDatabaseMap != null) {
|
||||
this.specifiedDatabaseMap = specifiedDatabaseMap;
|
||||
}
|
||||
try {
|
||||
this.dbType = JdbcResource.parseDbType(jdbcUrl);
|
||||
} catch (DdlException e) {
|
||||
@ -170,7 +175,7 @@ public class JdbcClient {
|
||||
Connection conn = getConnection();
|
||||
Statement stmt = null;
|
||||
ResultSet rs = null;
|
||||
if (isOnlySpecifiedDatabase) {
|
||||
if (isOnlySpecifiedDatabase && specifiedDatabaseMap.isEmpty()) {
|
||||
return getSpecifiedDatabase(conn);
|
||||
}
|
||||
List<String> databaseNames = Lists.newArrayList();
|
||||
@ -197,9 +202,18 @@ public class JdbcClient {
|
||||
default:
|
||||
throw new JdbcClientException("Not supported jdbc type");
|
||||
}
|
||||
|
||||
List<String> tempDatabaseNames = Lists.newArrayList();
|
||||
while (rs.next()) {
|
||||
databaseNames.add(rs.getString(1));
|
||||
tempDatabaseNames.add(rs.getString(1));
|
||||
}
|
||||
if (isOnlySpecifiedDatabase && !specifiedDatabaseMap.isEmpty()) {
|
||||
for (String db : tempDatabaseNames) {
|
||||
if (specifiedDatabaseMap.get(db) != null) {
|
||||
databaseNames.add(db);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
databaseNames = tempDatabaseNames;
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JdbcClientException("failed to get database name list from jdbc", e);
|
||||
|
||||
Reference in New Issue
Block a user