From 1956f04aa21328166ca7afc2bcbc53025b52e78b Mon Sep 17 00:00:00 2001 From: xueweizhang Date: Tue, 28 Mar 2023 14:04:41 +0800 Subject: [PATCH] [feature](multi-catalog) add specified_database_list PROPERTY for jdbc/hms/iceberg catalog (#17803) add specified_database_list PROPERTY for jdbc catalog, user can use many database specified by jdbc catalog --- docs/en/docs/lakehouse/multi-catalog/hive.md | 5 +++++ .../docs/lakehouse/multi-catalog/iceberg.md | 5 +++++ docs/en/docs/lakehouse/multi-catalog/jdbc.md | 3 ++- .../docs/lakehouse/multi-catalog/hive.md | 5 +++++ .../docs/lakehouse/multi-catalog/iceberg.md | 5 +++++ .../docs/lakehouse/multi-catalog/jdbc.md | 3 ++- .../apache/doris/catalog/JdbcResource.java | 7 ++++-- .../org/apache/doris/catalog/Resource.java | 1 + .../doris/datasource/ExternalCatalog.java | 18 +++++++++++++++ .../doris/datasource/HMSExternalCatalog.java | 4 ++++ .../doris/datasource/JdbcExternalCatalog.java | 6 ++++- .../iceberg/IcebergExternalCatalog.java | 4 ++++ .../doris/external/jdbc/JdbcClient.java | 22 +++++++++++++++---- .../test_mysql_jdbc_catalog.out | 3 +++ .../test_oracle_jdbc_catalog.out | 3 +++ .../jdbc_catalog_p0/test_pg_jdbc_catalog.out | 3 +++ .../test_mysql_jdbc_catalog.groovy | 20 +++++++++++++++++ .../test_oracle_jdbc_catalog.groovy | 18 +++++++++++++++ .../test_pg_jdbc_catalog.groovy | 18 +++++++++++++++ 19 files changed, 144 insertions(+), 9 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md b/docs/en/docs/lakehouse/multi-catalog/hive.md index b7b7a6db72..f66979284c 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hive.md +++ b/docs/en/docs/lakehouse/multi-catalog/hive.md @@ -55,6 +55,11 @@ CREATE CATALOG hive PROPERTIES ( ``` In addition to `type` and `hive.metastore.uris` , which are required, you can specify other parameters regarding the connection. + +> `specified_database_list`: +> +> only synchronize the specified databases, split with ','. Default values is '' will synchronize all databases. db name is case sensitive. +> For example, to specify HDFS HA: diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index 4b870dfea8..42d3d31aab 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -53,6 +53,11 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` +> `specified_database_list`: +> +> only synchronize the specified databases, split with ','. Default values is '' will synchronize all databases. db name is case sensitive. +> + ### Iceberg Native Catalog diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index 0e80a764d7..6fafadc83e 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -188,6 +188,7 @@ CREATE CATALOG hana_catalog PROPERTIES ( | `driver_class ` | Yes | | JDBC Driver Class | | `only_specified_database` | No | "false" | Whether only the database specified to be synchronized. | | `lower_case_table_names` | No | "false" | Whether to synchronize jdbc external data source table names in lower case. | +| `specified_database_list` | No | "" | When only_specified_database=true,only synchronize the specified databases. split with ','. db name is case sensitive.| > `driver_url` can be specified in three ways: > @@ -199,7 +200,7 @@ CREATE CATALOG hana_catalog PROPERTIES ( > `only_specified_database`: > -> When the JDBC is connected, you can specify which database/schema to connect. For example, you can specify the DataBase in mysql `jdbc_url`; you can specify the CurrentSchema in PG `jdbc_url`. `only_specified_database` specifies whether only the database specified to be synchronized. +> When the JDBC is connected, you can specify which database/schema to connect. For example, you can specify the DataBase in mysql `jdbc_url`; you can specify the CurrentSchema in PG `jdbc_url`. When `only_specified_database=true` and `specified_database_list` is empty, only the database in jdbc_url specified to be synchronized. When `only_specified_database=true` and `specified_database_list` with some database names,and these names will specified to be synchronized。 > > If you connect the Oracle database when using this property, please use the version of the jar package above 8 or more (such as ojdbc8.jar). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md index d38e324342..5a28058c2e 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md @@ -53,6 +53,11 @@ CREATE CATALOG hive PROPERTIES ( ``` 除了 `type` 和 `hive.metastore.uris` 两个必须参数外,还可以通过更多参数来传递连接所需要的信息。 + +> `specified_database_list`: +> +> 支持只同步指定的同步多个database,以','分隔。默认为'',同步所有database。db名称是大小写敏感的。 +> 如提供 HDFS HA 信息,示例如下: diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index 10e12af675..24e3ca20d3 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -51,6 +51,11 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` +> `specified_database_list`: +> +> 支持只同步指定的同步多个database,以','分隔。默认为'',同步所有database。db名称是大小写敏感的。 +> + ### 基于Iceberg API创建Catalog diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index 35dd06ce25..49765e4caa 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -189,6 +189,7 @@ CREATE CATALOG hana_catalog PROPERTIES ( `driver_class` | 是 | | JDBC Driver Class 名称 | `only_specified_database` | 否 | "false" | 指定是否只同步指定的 database | `lower_case_table_names` | 否 | "false" | 是否以小写的形式同步jdbc外部数据源的表名 | +`specified_database_list` | 否 | "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 | > `driver_url` 可以通过以下三种方式指定: > @@ -200,7 +201,7 @@ CREATE CATALOG hana_catalog PROPERTIES ( > `only_specified_database`: > -> 在jdbc连接时可以指定链接到哪个database/schema, 如:mysql中jdbc_url中可以指定database, pg的jdbc_url中可以指定currentSchema。`only_specified_database=true` 可以只同步指定的 database。 +> 在jdbc连接时可以指定链接到哪个database/schema, 如:mysql中jdbc_url中可以指定database, pg的jdbc_url中可以指定currentSchema。`only_specified_database=true` 且`specified_database_list`为空时,可以只同步指定的 database。当`only_specified_database=true`且`specified_database_list`指定了database列表时,则会同步指定的多个database。 > > 如果使用该参数时连接oracle数据库,要求使用ojdbc8.jar以上版本jar包。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 35e655526c..c170186d96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -94,11 +94,13 @@ public class JdbcResource extends Resource { DRIVER_URL, TYPE, ONLY_SPECIFIED_DATABASE, - LOWER_CASE_TABLE_NAMES + LOWER_CASE_TABLE_NAMES, + SPECIFIED_DATABASE_LIST ).build(); private static final ImmutableList OPTIONAL_PROPERTIES = new ImmutableList.Builder().add( ONLY_SPECIFIED_DATABASE, - LOWER_CASE_TABLE_NAMES + LOWER_CASE_TABLE_NAMES, + SPECIFIED_DATABASE_LIST ).build(); // The default value of optional properties @@ -108,6 +110,7 @@ public class JdbcResource extends Resource { static { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(SPECIFIED_DATABASE_LIST, ""); } // timeout for both connection and read. 10 seconds is long enough. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java index 80e9f3d37b..6c39553926 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; public abstract class Resource implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class); public static final String REFERENCE_SPLIT = "@"; + public static final String SPECIFIED_DATABASE_LIST = "specified_database_list"; public enum ResourceType { UNKNOWN, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index d4361f8f78..504624e062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; @@ -417,4 +418,21 @@ public abstract class ExternalCatalog implements CatalogIf, Wr public void createDatabase(long dbId, String dbName) { throw new NotImplementedException(); } + + public Map getSpecifiedDatabaseMap() { + String specifiedDatabaseList = catalogProperty.getOrDefault(Resource.SPECIFIED_DATABASE_LIST, ""); + Map specifiedDatabaseMap = Maps.newHashMap(); + specifiedDatabaseList = specifiedDatabaseList.trim(); + if (specifiedDatabaseList.isEmpty()) { + return specifiedDatabaseMap; + } + String[] databaseList = specifiedDatabaseList.split(","); + for (int i = 0; i < databaseList.length; i++) { + String dbname = databaseList[i].trim(); + if (!dbname.isEmpty()) { + specifiedDatabaseMap.put(dbname, true); + } + } + return specifiedDatabaseMap; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index ad02f4d055..319227495b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -128,8 +128,12 @@ public class HMSExternalCatalog extends ExternalCatalog { initCatalogLog.setCatalogId(id); initCatalogLog.setType(InitCatalogLog.Type.HMS); List allDatabases = client.getAllDatabases(); + Map specifiedDatabaseMap = getSpecifiedDatabaseMap(); // Update the db name to id map. for (String dbName : allDatabases) { + if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) { + continue; + } long dbId; if (dbNameToId != null && dbNameToId.containsKey(dbName)) { dbId = dbNameToId.get(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java index 89d7f462a7..5ebbb512b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java @@ -128,10 +128,14 @@ public class JdbcExternalCatalog extends ExternalCatalog { return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false"); } + public String getSpecifiedDatabaseList() { + return catalogProperty.getOrDefault(JdbcResource.SPECIFIED_DATABASE_LIST, ""); + } + @Override protected void initLocalObjectsImpl() { jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass(), - getOnlySpecifiedDatabase(), getLowerCaseTableNames()); + getOnlySpecifiedDatabase(), getLowerCaseTableNames(), getSpecifiedDatabaseMap()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 0777cef518..d7846a9012 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -67,7 +67,11 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { initCatalogLog.setCatalogId(id); initCatalogLog.setType(InitCatalogLog.Type.ICEBERG); List allDatabaseNames = listDatabaseNames(); + Map specifiedDatabaseMap = getSpecifiedDatabaseMap(); for (String dbName : allDatabaseNames) { + if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) { + continue; + } long dbId; if (dbNameToId != null && dbNameToId.containsKey(dbName)) { dbId = dbNameToId.get(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java index 32ef14f9ce..689667560e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java @@ -62,14 +62,19 @@ public class JdbcClient { private boolean isLowerCaseTableNames = false; + private Map specifiedDatabaseMap = Maps.newHashMap(); + // only used when isLowerCaseTableNames = true. private Map lowerTableToRealTable = Maps.newHashMap(); public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass, - String onlySpecifiedDatabase, String isLowerCaseTableNames) { + String onlySpecifiedDatabase, String isLowerCaseTableNames, Map specifiedDatabaseMap) { this.jdbcUser = user; this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue(); this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue(); + if (specifiedDatabaseMap != null) { + this.specifiedDatabaseMap = specifiedDatabaseMap; + } try { this.dbType = JdbcResource.parseDbType(jdbcUrl); } catch (DdlException e) { @@ -170,7 +175,7 @@ public class JdbcClient { Connection conn = getConnection(); Statement stmt = null; ResultSet rs = null; - if (isOnlySpecifiedDatabase) { + if (isOnlySpecifiedDatabase && specifiedDatabaseMap.isEmpty()) { return getSpecifiedDatabase(conn); } List databaseNames = Lists.newArrayList(); @@ -197,9 +202,18 @@ public class JdbcClient { default: throw new JdbcClientException("Not supported jdbc type"); } - + List tempDatabaseNames = Lists.newArrayList(); while (rs.next()) { - databaseNames.add(rs.getString(1)); + tempDatabaseNames.add(rs.getString(1)); + } + if (isOnlySpecifiedDatabase && !specifiedDatabaseMap.isEmpty()) { + for (String db : tempDatabaseNames) { + if (specifiedDatabaseMap.get(db) != null) { + databaseNames.add(db); + } + } + } else { + databaseNames = tempDatabaseNames; } } catch (SQLException e) { throw new JdbcClientException("failed to get database name list from jdbc", e); diff --git a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out index b7d6e8346e..f7a959eead 100644 --- a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out @@ -175,6 +175,9 @@ doris3 20 -- !specified_database -- doris_test +-- !specified_database -- +doris_test + -- !ex_tb1 -- {"k1":"v1", "k2":"v2"} diff --git a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out index fdfda85e67..4393f91b62 100644 --- a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out @@ -82,6 +82,9 @@ doris3 20 -- !specified_database -- DORIS_TEST +-- !specified_database -- +DORIS_TEST + -- !lower_case_table_names1 -- 1 111 123 7456123.89 573 34 673.43 34.1264 60.0 23.231 diff --git a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out index 67a1b4914c..2c8073daff 100644 --- a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out @@ -2152,6 +2152,9 @@ doris3 20 -- !specified_database -- doris_test +-- !specified_database -- +doris_test + -- !test_old -- 123 abc 123 abc diff --git a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy index 35246942e5..c588c182b2 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy @@ -140,6 +140,26 @@ suite("test_mysql_jdbc_catalog", "p0") { sql """ drop catalog if exists ${catalog_name} """ sql """ drop resource if exists ${resource_name} """ + // test only_specified_database and specified_database_list argument + sql """create resource if not exists ${resource_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://127.0.0.1:${mysql_port}/doris_test?useSSL=false", + "driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "only_specified_database" = "true", + "specified_database_list" = "doris_test" + );""" + + sql """CREATE CATALOG ${catalog_name} WITH RESOURCE ${resource_name}""" + sql """switch ${catalog_name}""" + + qt_specified_database """ show databases; """ + + sql """ drop catalog if exists ${catalog_name} """ + sql """ drop resource if exists ${resource_name} """ + // test old create-catalog syntax for compatibility sql """ CREATE CATALOG ${catalog_name} PROPERTIES ( "type"="jdbc", diff --git a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy index ee20b76706..e0ecd97f2e 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy @@ -105,6 +105,24 @@ suite("test_oracle_jdbc_catalog", "p0") { sql """drop catalog if exists ${catalog_name} """ sql """drop resource if exists ${resource_name}""" + // test only_specified_database and specified_database_list argument + sql """create resource if not exists ${resource_name} properties( + "type"="jdbc", + "user"="doris_test", + "password"="123456", + "jdbc_url" = "jdbc:oracle:thin:@127.0.0.1:${oracle_port}:${SID}", + "driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/ojdbc8.jar", + "driver_class" = "oracle.jdbc.driver.OracleDriver", + "only_specified_database" = "true", + "specified_database_list" = "${ex_db_name}" + );""" + sql """ CREATE CATALOG ${catalog_name} WITH RESOURCE ${resource_name} """ + sql """ switch ${catalog_name} """ + + qt_specified_database """ show databases; """ + sql """drop catalog if exists ${catalog_name} """ + sql """drop resource if exists ${resource_name}""" + // test lower_case_table_names argument sql """create resource if not exists ${resource_name} properties( "type"="jdbc", diff --git a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy index d4e5036a22..d0b3c76d7c 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy @@ -106,6 +106,24 @@ suite("test_pg_jdbc_catalog", "p0") { sql """drop catalog if exists ${catalog_name} """ sql """drop resource if exists ${resource_name}""" + // test only_specified_database and specified_database_list argument + sql """create resource if not exists ${resource_name} properties( + "type"="jdbc", + "user"="postgres", + "password"="123456", + "jdbc_url" = "jdbc:postgresql://127.0.0.1:${pg_port}/postgres?currentSchema=doris_test&useSSL=false", + "driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "only_specified_database" = "true", + "specified_database_list" = "doris_test" + );""" + sql """CREATE CATALOG ${catalog_name} WITH RESOURCE ${resource_name} """ + sql """switch ${catalog_name} """ + qt_specified_database """ show databases; """ + + sql """drop catalog if exists ${catalog_name} """ + sql """drop resource if exists ${resource_name}""" + // test old create-catalog syntax for compatibility sql """ CREATE CATALOG ${catalog_name} PROPERTIES ( "type"="jdbc",