From a20a6d2bea07f77f4f95cdef757c3ea46d6ebddd Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Fri, 2 Jun 2023 17:58:10 +0800 Subject: [PATCH] [refactor](jdbc catalog) Refactor the JdbcClient code (#20109) This PR does the following: 1. This PR is a substantial refactor of the JDBC client architecture. The previous monolithic JDBC client has been refactored into an abstract base class `JdbcClient`, and a set of database-specific subclasses (e.g., `JdbcMySQLClient`, `JdbcOracleClient`, etc.), and the JdbcClient required config, abstract into an object. This allows for improved modularity, easier addition of support for new databases, and cleaner, more maintainable code. This change is backward-compatible and does not affect existing functionality. 2. As a result of client refactoring, OceanBaseClient can automatically recognize the mode of operation as MySQL or Oracle, so we cancel the oceanbase_mode property in the Jdbc Catalog, but due to the cancellation of the property, When creating a single OceanBase Jdbc Table, the table type needs to be filled in as oceanbase(mysql mode) or oceanbase_oracle(oracle_mode). The above work is a change in the usage behavior, please note. 3. For the PostgreSQL Jdbc Catalog, I did two things: 1. The adaptation to MATERIALIZED VIEW and FOREIGN TABLE is added 2. Fixed reading jsonb, which had been incorrectly changed to json in a previous PR 4. fix some jdbc catalog test case 5. modify oceanbase jdbc doc And,Thanks @wolfboys for the guidance --- docs/en/docs/lakehouse/external-table/jdbc.md | 14 +- docs/en/docs/lakehouse/multi-catalog/jdbc.md | 15 +- .../Create/CREATE-CATALOG.md | 3 +- .../docs/lakehouse/external-table/jdbc.md | 14 +- .../docs/lakehouse/multi-catalog/jdbc.md | 14 +- .../Create/CREATE-CATALOG.md | 3 +- .../apache/doris/catalog/JdbcResource.java | 26 +- .../org/apache/doris/catalog/JdbcTable.java | 25 +- .../doris/datasource/JdbcExternalCatalog.java | 23 +- .../external/jdbc/JdbcClickHouseClient.java | 111 ++ .../doris/external/jdbc/JdbcClient.java | 980 ++++-------------- .../doris/external/jdbc/JdbcClientConfig.java | 114 ++ .../doris/external/jdbc/JdbcMySQLClient.java | 177 ++++ .../external/jdbc/JdbcOceanBaseClient.java | 73 ++ .../doris/external/jdbc/JdbcOracleClient.java | 124 +++ .../external/jdbc/JdbcPostgreSQLClient.java | 105 ++ .../external/jdbc/JdbcSQLServerClient.java | 90 ++ .../external/jdbc/JdbcSapHanaClient.java | 95 ++ .../doris/external/jdbc/JdbcTrinoClient.java | 99 ++ .../test_clickhouse_jdbc_catalog.out | Bin 1466 -> 1460 bytes .../test_mysql_jdbc_catalog.out | 11 +- .../jdbc_catalog_p0/test_pg_jdbc_catalog.out | 11 +- 22 files changed, 1242 insertions(+), 885 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClickHouseClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClientConfig.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOceanBaseClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOracleClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcPostgreSQLClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSQLServerClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSapHanaClient.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcTrinoClient.java diff --git a/docs/en/docs/lakehouse/external-table/jdbc.md b/docs/en/docs/lakehouse/external-table/jdbc.md index 2b2844f2bc..c5b782fb6f 100644 --- a/docs/en/docs/lakehouse/external-table/jdbc.md +++ b/docs/en/docs/lakehouse/external-table/jdbc.md @@ -251,10 +251,10 @@ properties ( "password"="", "jdbc_url" = "jdbc:oceanbase://localhost:2881/test", "driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" or "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ); +mysql mode CREATE EXTERNAL TABLE `ext_oceanbase` ( `k1` int ) ENGINE=JDBC @@ -263,6 +263,16 @@ PROPERTIES ( "table" = "test.test", "table_type"="oceanbase" ); + +oracle mode +CREATE EXTERNAL TABLE `ext_oceanbase` ( + `k1` int +) ENGINE=JDBC +PROPERTIES ( + "resource" = "jdbc_oceanbase", + "table" = "test.test", + "table_type"="oceanbase_oracle" +); ``` ### 9.NebulaGraphTest (only supports queries) diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index 8e3ab3eb60..c61ca75845 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -214,18 +214,7 @@ CREATE CATALOG jdbc_oceanbase_mysql PROPERTIES ( "password"="123456", "jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo", "driver_url" = "oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" -) - -CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES ( - "type"="jdbc", - "user"="root", - "password"="123456", - "jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo", - "driver_url" = "oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ) ``` @@ -240,9 +229,9 @@ CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES ( | `driver_class ` | Yes | | JDBC Driver Class | | `only_specified_database` | No | "false" | Whether only the database specified to be synchronized. | | `lower_case_table_names` | No | "false" | Whether to synchronize jdbc external data source table names in lower case. | -| `oceanbase_mode` | No | "" | When the connected external data source is OceanBase, the mode must be specified as mysql or oracle | | `include_database_list` | No | "" | When only_specified_database=true,only synchronize the specified databases. split with ','. db name is case sensitive. | | `exclude_database_list` | No | "" | When only_specified_database=true,do not synchronize the specified databases. split with ','. db name is case sensitive. | + > `driver_url` can be specified in three ways: > > 1. File name. For example, `mysql-connector-java-5.1.47.jar`. Please place the Jar file package in `jdbc_drivers/` under the FE/BE deployment directory in advance so the system can locate the file. You can change the location of the file by modifying `jdbc_drivers_dir` in fe.conf and be.conf. diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md index b174e09d4d..99d4fc424f 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md @@ -163,8 +163,7 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name "password"="", "jdbc_url" = "jdbc:oceanbase://localhost:2881/demo", "driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" or "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ); ``` diff --git a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md index 87894589c2..669d9d22e0 100644 --- a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md @@ -245,10 +245,10 @@ properties ( "password"="", "jdbc_url" = "jdbc:oceanbase://localhost:2881/test", "driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" or "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ); +mysql模式 CREATE EXTERNAL TABLE `ext_oceanbase` ( `k1` int ) ENGINE=JDBC @@ -257,6 +257,16 @@ PROPERTIES ( "table" = "test.test", "table_type"="oceanbase" ); + +oracle模式 +CREATE EXTERNAL TABLE `ext_oceanbase` ( + `k1` int +) ENGINE=JDBC +PROPERTIES ( + "resource" = "jdbc_oceanbase", + "table" = "test.test", + "table_type"="oceanbase_oracle" +); ``` ### 9.Nebula-graph测试 (仅支持查询) diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index f51527ee1d..a73b70170b 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -215,18 +215,7 @@ CREATE CATALOG jdbc_oceanbase_mysql PROPERTIES ( "password"="123456", "jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo", "driver_url" = "oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" -) - -CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES ( - "type"="jdbc", - "user"="root", - "password"="123456", - "jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo", - "driver_url" = "oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ) ``` @@ -241,7 +230,6 @@ CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES ( | `driver_class` | 是 | | JDBC Driver Class 名称 | | `only_specified_database` | 否 | "false" | 指定是否只同步指定的 database | | `lower_case_table_names` | 否 | "false" | 是否以小写的形式同步jdbc外部数据源的表名 | -| `oceanbase_mode` | 否 | "" | 当连接的外部数据源为OceanBase时,必须为其指定模式为mysql或oracle | | `include_database_list` | 否 | "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 | | `exclude_database_list` | 否 | "" | 当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。| diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md index ad4e6e0c53..99b0bf5d22 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG.md @@ -167,8 +167,7 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name "password"="", "jdbc_url" = "jdbc:oceanbase://localhost:2881/demo", "driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar", - "driver_class" = "com.oceanbase.jdbc.Driver", - "oceanbase_mode" = "mysql" or "oracle" + "driver_class" = "com.oceanbase.jdbc.Driver" ); ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 576452c969..2e8a71f92a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -94,7 +94,6 @@ public class JdbcResource extends Resource { public static final String TYPE = "type"; public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database"; public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names"; - public static final String OCEANBASE_MODE = "oceanbase_mode"; public static final String CHECK_SUM = "checksum"; private static final ImmutableList ALL_PROPERTIES = new ImmutableList.Builder().add( JDBC_URL, @@ -105,14 +104,12 @@ public class JdbcResource extends Resource { TYPE, ONLY_SPECIFIED_DATABASE, LOWER_CASE_TABLE_NAMES, - OCEANBASE_MODE, INCLUDE_DATABASE_LIST, EXCLUDE_DATABASE_LIST ).build(); private static final ImmutableList OPTIONAL_PROPERTIES = new ImmutableList.Builder().add( ONLY_SPECIFIED_DATABASE, LOWER_CASE_TABLE_NAMES, - OCEANBASE_MODE, INCLUDE_DATABASE_LIST, EXCLUDE_DATABASE_LIST ).build(); @@ -124,7 +121,6 @@ public class JdbcResource extends Resource { static { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false"); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(OCEANBASE_MODE, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, ""); } @@ -153,7 +149,7 @@ public class JdbcResource extends Resource { for (String propertyKey : ALL_PROPERTIES) { replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey)); } - this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE))); + this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL))); super.modifyProperties(properties); } @@ -186,7 +182,7 @@ public class JdbcResource extends Resource { throw new DdlException("JdbcResource Missing " + property + " in properties"); } } - this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE))); + this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL))); configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL))); } @@ -269,7 +265,7 @@ public class JdbcResource extends Resource { } } - public static String parseDbType(String url, String oceanbaseMode) throws DdlException { + public static String parseDbType(String url) throws DdlException { if (url.startsWith(JDBC_MYSQL) || url.startsWith(JDBC_MARIADB)) { return MYSQL; } else if (url.startsWith(JDBC_POSTGRESQL)) { @@ -287,27 +283,17 @@ public class JdbcResource extends Resource { } else if (url.startsWith(JDBC_PRESTO)) { return PRESTO; } else if (url.startsWith(JDBC_OCEANBASE)) { - if (oceanbaseMode == null || oceanbaseMode.isEmpty()) { - throw new DdlException("OceanBase mode must be specified for OceanBase databases" - + "(either 'mysql' or 'oracle')"); - } - if (oceanbaseMode.equalsIgnoreCase("mysql")) { - return OCEANBASE; - } else if (oceanbaseMode.equalsIgnoreCase("oracle")) { - return OCEANBASE_ORACLE; - } else { - throw new DdlException("Invalid OceanBase mode: " + oceanbaseMode + ". Must be 'mysql' or 'oracle'"); - } + return OCEANBASE; } else if (url.startsWith(JDBC_NEBULA)) { return NEBULA; } throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url); } - public static String handleJdbcUrl(String jdbcUrl, String oceanbaseMode) throws DdlException { + public static String handleJdbcUrl(String jdbcUrl) throws DdlException { // delete all space in jdbcUrl String newJdbcUrl = jdbcUrl.replaceAll(" ", ""); - String dbType = parseDbType(newJdbcUrl, oceanbaseMode); + String dbType = parseDbType(newJdbcUrl); if (dbType.equals(MYSQL) || dbType.equals(OCEANBASE)) { // `yearIsDateType` is a parameter of JDBC, and the default is true. // We force the use of `yearIsDateType=false` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 5fa608bd80..432cfd0aae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -39,7 +39,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -287,10 +286,7 @@ public class JdbcTable extends Table { throw new DdlException("property " + TABLE_TYPE + " must be set"); } - Map tableTypeMapWithoutOceanbaseOracle = new HashMap<>(TABLE_TYPE_MAP); - tableTypeMapWithoutOceanbaseOracle.remove("oceanbase_oracle"); - - if (!tableTypeMapWithoutOceanbaseOracle.containsKey(jdbcTypeName.toLowerCase())) { + if (!TABLE_TYPE_MAP.containsKey(jdbcTypeName.toLowerCase())) { throw new DdlException("Unknown jdbc table type: " + jdbcTypeName); } @@ -310,21 +306,10 @@ public class JdbcTable extends Table { driverUrl = jdbcResource.getProperty(DRIVER_URL); checkSum = jdbcResource.getProperty(CHECK_SUM); - if (!jdbcTypeName.equalsIgnoreCase(jdbcUrl.split(":")[1])) { - throw new DdlException("property " + TABLE_TYPE + " must be same with resource url"); - } - - // get oceanbase_mode - String oceanbaseMode = jdbcResource.getProperty("oceanbase_mode"); - - // by oceanbase_mode set jdbcTypeName - if ("oceanbase".equalsIgnoreCase(jdbcTypeName)) { - if ("mysql".equalsIgnoreCase(oceanbaseMode)) { - jdbcTypeName = "oceanbase"; - } else if ("oracle".equalsIgnoreCase(oceanbaseMode)) { - jdbcTypeName = "oceanbase_oracle"; - } else { - throw new DdlException("Unknown oceanbase_mode: " + oceanbaseMode); + String urlType = jdbcUrl.split(":")[1]; + if (!jdbcTypeName.equalsIgnoreCase(urlType)) { + if (!(jdbcTypeName.equalsIgnoreCase("oceanbase_oracle") && urlType.equalsIgnoreCase("oceanbase"))) { + throw new DdlException("property " + TABLE_TYPE + " must be same with resource url"); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java index ca373daba8..9b0700b81c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.common.DdlException; import org.apache.doris.external.jdbc.JdbcClient; +import org.apache.doris.external.jdbc.JdbcClientConfig; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -78,9 +79,8 @@ public class JdbcExternalCatalog extends ExternalCatalog { properties.put(StringUtils.removeStart(kv.getKey(), JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue()); } String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, ""); - String oceanbaseMode = properties.getOrDefault(JdbcResource.OCEANBASE_MODE, ""); if (!Strings.isNullOrEmpty(jdbcUrl)) { - jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl, oceanbaseMode); + jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl); properties.put(JdbcResource.JDBC_URL, jdbcUrl); } @@ -127,15 +127,20 @@ public class JdbcExternalCatalog extends ExternalCatalog { return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false"); } - public String getOceanBaseMode() { - return catalogProperty.getOrDefault(JdbcResource.OCEANBASE_MODE, ""); - } - @Override protected void initLocalObjectsImpl() { - jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), - getDriverClass(), getOnlySpecifiedDatabase(), getLowerCaseTableNames(), - getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap()); + JdbcClientConfig jdbcClientConfig = new JdbcClientConfig() + .setUser(getJdbcUser()) + .setPassword(getJdbcPasswd()) + .setJdbcUrl(getJdbcUrl()) + .setDriverUrl(getDriverUrl()) + .setDriverClass(getDriverClass()) + .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) + .setIsLowerCaseTableNames(getLowerCaseTableNames()) + .setIncludeDatabaseMap(getIncludeDatabaseMap()) + .setExcludeDatabaseMap(getExcludeDatabaseMap()); + + jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); } protected List listDatabaseNames() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClickHouseClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClickHouseClient.java new file mode 100644 index 0000000000..794fb868ff --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClickHouseClient.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +public class JdbcClickHouseClient extends JdbcClient { + + protected JdbcClickHouseClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SHOW DATABASES"; + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + + String ckType = fieldSchema.getDataTypeName(); + + if (ckType.startsWith("LowCardinality")) { + ckType = ckType.substring(15, ckType.length() - 1); + if (ckType.startsWith("Nullable")) { + ckType = ckType.substring(9, ckType.length() - 1); + } + } else if (ckType.startsWith("Nullable")) { + ckType = ckType.substring(9, ckType.length() - 1); + } + + if (ckType.startsWith("Decimal")) { + String[] accuracy = ckType.substring(8, ckType.length() - 1).split(", "); + int precision = Integer.parseInt(accuracy[0]); + int scale = Integer.parseInt(accuracy[1]); + return createDecimalOrStringType(precision, scale); + } + + if ("String".contains(ckType) || ckType.startsWith("Enum") + || ckType.startsWith("IPv") || "UUID".contains(ckType) + || ckType.startsWith("FixedString")) { + return ScalarType.createStringType(); + } + + if (ckType.startsWith("DateTime")) { + // DateTime with second precision, DateTime64 with [0~9] precision + if (ckType.equals("DateTime")) { + return ScalarType.createDatetimeV2Type(0); + } else { + // will lose precision + return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); + } + } + + if (ckType.startsWith("Array")) { + String cktype = ckType.substring(6, ckType.length() - 1); + fieldSchema.setDataTypeName(cktype); + Type type = jdbcTypeToDoris(fieldSchema); + return ArrayType.create(type, true); + } + + switch (ckType) { + case "Bool": + return Type.BOOLEAN; + case "Int8": + return Type.TINYINT; + case "Int16": + case "UInt8": + return Type.SMALLINT; + case "Int32": + case "UInt16": + return Type.INT; + case "Int64": + case "UInt32": + return Type.BIGINT; + case "Int128": + case "UInt64": + return Type.LARGEINT; + case "Int256": + case "UInt128": + case "UInt256": + return ScalarType.createStringType(); + case "Float32": + return Type.FLOAT; + case "Float64": + return Type.DOUBLE; + case "Date": + case "Date32": + return ScalarType.createDateV2Type(); + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java index 99ada8893b..bf795d3364 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java @@ -17,10 +17,8 @@ package org.apache.doris.external.jdbc; -import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.JdbcResource; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; @@ -42,56 +40,74 @@ import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; @Getter -public class JdbcClient { +public abstract class JdbcClient { private static final Logger LOG = LogManager.getLogger(JdbcClient.class); - private static final int HTTP_TIMEOUT_MS = 10000; + protected static final int JDBC_DATETIME_SCALE = 6; - public static final int JDBC_DATETIME_SCALE = 6; - - private String dbType; - private String jdbcUser; - - private URLClassLoader classLoader = null; - - private DruidDataSource dataSource = null; - private boolean isOnlySpecifiedDatabase = false; - - private boolean isLowerCaseTableNames = false; - - private Map includeDatabaseMap = Maps.newHashMap(); - private Map excludeDatabaseMap = Maps.newHashMap(); + protected String dbType; + protected String jdbcUser; + protected URLClassLoader classLoader = null; + protected DruidDataSource dataSource = null; + protected boolean isOnlySpecifiedDatabase; + protected boolean isLowerCaseTableNames; + protected String oceanbaseMode = ""; + protected Map includeDatabaseMap; + protected Map excludeDatabaseMap; // only used when isLowerCaseTableNames = true. - private Map lowerTableToRealTable = Maps.newHashMap(); - + protected Map lowerTableToRealTable = Maps.newHashMap(); // only used when isLowerCaseTableNames = true. - private Map lowerDBToRealDB = Maps.newHashMap(); + protected Map lowerDBToRealDB = Maps.newHashMap(); - private String oceanbaseMode = ""; + public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { + String dbType = parseDbType(jdbcClientConfig.getJdbcUrl()); + switch (dbType) { + case JdbcResource.MYSQL: + return new JdbcMySQLClient(jdbcClientConfig); + case JdbcResource.OCEANBASE: + return new JdbcOceanBaseClient(jdbcClientConfig); + case JdbcResource.POSTGRESQL: + return new JdbcPostgreSQLClient(jdbcClientConfig); + case JdbcResource.ORACLE: + return new JdbcOracleClient(jdbcClientConfig); + case JdbcResource.SQLSERVER: + return new JdbcSQLServerClient(jdbcClientConfig); + case JdbcResource.CLICKHOUSE: + return new JdbcClickHouseClient(jdbcClientConfig); + case JdbcResource.SAP_HANA: + return new JdbcSapHanaClient(jdbcClientConfig); + case JdbcResource.TRINO: + case JdbcResource.PRESTO: + return new JdbcTrinoClient(jdbcClientConfig); + default: + throw new IllegalArgumentException("Unsupported DB type: " + dbType); + } + } - public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass, - String onlySpecifiedDatabase, String isLowerCaseTableNames, String oceanbaseMode, Map includeDatabaseMap, - Map excludeDatabaseMap) { - this.jdbcUser = user; - this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue(); - this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue(); - if (includeDatabaseMap != null) { - this.includeDatabaseMap = includeDatabaseMap; - } - if (excludeDatabaseMap != null) { - this.excludeDatabaseMap = excludeDatabaseMap; - } - this.oceanbaseMode = oceanbaseMode; - try { - this.dbType = JdbcResource.parseDbType(jdbcUrl, oceanbaseMode); - } catch (DdlException e) { - throw new JdbcClientException("Failed to parse db type from jdbcUrl: " + jdbcUrl, e); - } + protected JdbcClient(JdbcClientConfig jdbcClientConfig) { + this.jdbcUser = jdbcClientConfig.getUser(); + this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); + this.isLowerCaseTableNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseTableNames()); + this.includeDatabaseMap = + Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap()); + this.excludeDatabaseMap = + Optional.ofNullable(jdbcClientConfig.getExcludeDatabaseMap()).orElse(Collections.emptyMap()); + String jdbcUrl = jdbcClientConfig.getJdbcUrl(); + this.dbType = parseDbType(jdbcUrl); + initializeDataSource(jdbcClientConfig.getPassword(), jdbcUrl, jdbcClientConfig.getDriverUrl(), + jdbcClientConfig.getDriverClass()); + } + + // Initialize DruidDataSource + private void initializeDataSource(String password, String jdbcUrl, String driverUrl, String driverClass) { ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); try { // TODO(ftw): The problem here is that the jar package is handled by FE @@ -127,12 +143,20 @@ public class JdbcClient { } } + private static String parseDbType(String jdbcUrl) { + try { + return JdbcResource.parseDbType(jdbcUrl); + } catch (DdlException e) { + throw new JdbcClientException("Failed to parse db type from jdbcUrl: " + jdbcUrl, e); + } + } + public void closeClient() { dataSource.close(); } public Connection getConnection() throws JdbcClientException { - Connection conn = null; + Connection conn; try { conn = dataSource.getConnection(); } catch (Exception e) { @@ -141,46 +165,19 @@ public class JdbcClient { return conn; } - // close connection - public void close(Object o) { - if (o == null) { - return; - } - if (o instanceof ResultSet) { - try { - ((ResultSet) o).close(); - } catch (SQLException e) { - throw new JdbcClientException("Can not close ResultSet ", e); - } - } else if (o instanceof Statement) { - try { - ((Statement) o).close(); - } catch (SQLException e) { - throw new JdbcClientException("Can not close Statement ", e); - } - } else if (o instanceof Connection) { - Connection c = (Connection) o; - try { - if (!c.isClosed()) { - c.close(); + public void close(AutoCloseable... closeables) { + for (AutoCloseable closeable : closeables) { + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + throw new JdbcClientException("Can not close : ", e); } - } catch (SQLException e) { - throw new JdbcClientException("Can not close Connection ", e); } } } - public void close(ResultSet rs, Statement stmt, Connection conn) { - close(rs); - close(stmt); - close(conn); - } - - public void close(ResultSet rs, Connection conn) { - close(rs); - close(conn); - } - + // This part used to process meta-information of database, table and column. /** * get all database name through JDBC * @return list of database names @@ -195,33 +192,8 @@ public class JdbcClient { List databaseNames = Lists.newArrayList(); try { stmt = conn.createStatement(); - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.CLICKHOUSE: - case JdbcResource.OCEANBASE: - rs = stmt.executeQuery("SHOW DATABASES"); - break; - case JdbcResource.POSTGRESQL: - rs = stmt.executeQuery("SELECT nspname FROM pg_namespace WHERE has_schema_privilege(" - + "'" + jdbcUser + "', nspname, 'USAGE');"); - break; - case JdbcResource.ORACLE: - case JdbcResource.OCEANBASE_ORACLE: - rs = stmt.executeQuery("SELECT DISTINCT OWNER FROM all_tables"); - break; - case JdbcResource.SQLSERVER: - rs = stmt.executeQuery("SELECT name FROM sys.schemas"); - break; - case JdbcResource.SAP_HANA: - rs = stmt.executeQuery("SELECT SCHEMA_NAME FROM SYS.SCHEMAS WHERE HAS_PRIVILEGES = 'TRUE'"); - break; - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - rs = stmt.executeQuery("SHOW SCHEMAS"); - break; - default: - throw new JdbcClientException("Not supported jdbc type"); - } + String sql = getDatabaseQuery(); + rs = stmt.executeQuery(sql); List tempDatabaseNames = Lists.newArrayList(); while (rs.next()) { String databaseName = rs.getString(1); @@ -253,154 +225,53 @@ public class JdbcClient { return databaseNames; } - public List getSpecifiedDatabase(Connection conn) { - List databaseNames = Lists.newArrayList(); - try { - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.OCEANBASE: - databaseNames.add(conn.getCatalog()); - break; - case JdbcResource.CLICKHOUSE: - case JdbcResource.POSTGRESQL: - case JdbcResource.ORACLE: - case JdbcResource.SQLSERVER: - case JdbcResource.SAP_HANA: - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - case JdbcResource.OCEANBASE_ORACLE: - databaseNames.add(conn.getSchema()); - break; - default: - throw new JdbcClientException("Not supported jdbc type"); - } - } catch (SQLException e) { - throw new JdbcClientException("failed to get specified database name from jdbc", e); - } finally { - close(conn); - } - return databaseNames; - } - /** * get all tables of one database */ public List getTablesNameList(String dbName) { - Connection conn = getConnection(); - ResultSet rs = null; + List tablesName = Lists.newArrayList(); + String[] tableTypes = getTableTypes(); if (isLowerCaseTableNames) { dbName = lowerDBToRealDB.get(dbName); } - List tablesName = Lists.newArrayList(); - String[] types = {"TABLE", "VIEW"}; - String[] hanaTypes = {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; - try { - DatabaseMetaData databaseMetaData = conn.getMetaData(); - String catalogName = conn.getCatalog(); - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.OCEANBASE: - rs = databaseMetaData.getTables(dbName, null, null, types); - break; - case JdbcResource.POSTGRESQL: - case JdbcResource.ORACLE: - case JdbcResource.CLICKHOUSE: - case JdbcResource.SQLSERVER: - case JdbcResource.OCEANBASE_ORACLE: - rs = databaseMetaData.getTables(null, dbName, null, types); - break; - case JdbcResource.SAP_HANA: - rs = databaseMetaData.getTables(null, dbName, null, hanaTypes); - break; - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - rs = databaseMetaData.getTables(catalogName, dbName, null, types); - break; - default: - throw new JdbcClientException("Unknown database type"); - } - while (rs.next()) { - String tableName = rs.getString("TABLE_NAME"); - if (isLowerCaseTableNames) { - lowerTableToRealTable.put(tableName.toLowerCase(), tableName); - tableName = tableName.toLowerCase(); + String finalDbName = dbName; + processTable(dbName, null, tableTypes, (rs) -> { + try { + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + if (isLowerCaseTableNames) { + lowerTableToRealTable.put(tableName.toLowerCase(), tableName); + tableName = tableName.toLowerCase(); + } + tablesName.add(tableName); } - tablesName.add(tableName); + } catch (SQLException e) { + throw new JdbcClientException("failed to get all tables for db %s", e, finalDbName); } - } catch (SQLException e) { - throw new JdbcClientException("failed to get all tables for db %s", e, dbName); - } finally { - close(rs, conn); - } + }); return tablesName; } public boolean isTableExist(String dbName, String tableName) { - Connection conn = getConnection(); - ResultSet rs = null; + final boolean[] isExist = {false}; if (isLowerCaseTableNames) { dbName = lowerDBToRealDB.get(dbName); tableName = lowerTableToRealTable.get(tableName); } - String[] types = {"TABLE", "VIEW"}; - String[] hanaTypes = {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; - try { - DatabaseMetaData databaseMetaData = conn.getMetaData(); - String catalogName = conn.getCatalog(); - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.OCEANBASE: - rs = databaseMetaData.getTables(dbName, null, tableName, types); - break; - case JdbcResource.POSTGRESQL: - case JdbcResource.ORACLE: - case JdbcResource.CLICKHOUSE: - case JdbcResource.SQLSERVER: - case JdbcResource.OCEANBASE_ORACLE: - rs = databaseMetaData.getTables(null, dbName, null, types); - break; - case JdbcResource.SAP_HANA: - rs = databaseMetaData.getTables(null, dbName, null, hanaTypes); - break; - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - rs = databaseMetaData.getTables(catalogName, dbName, null, types); - break; - default: - throw new JdbcClientException("Unknown database type: " + dbType); + String[] tableTypes = getTableTypes(); + String finalTableName = tableName; + String finalDbName = dbName; + processTable(dbName, tableName, tableTypes, (rs) -> { + try { + if (rs.next()) { + isExist[0] = true; + } + } catch (SQLException e) { + throw new JdbcClientException("failed to judge if table exist for table %s in db %s", + e, finalTableName, finalDbName); } - if (rs.next()) { - return true; - } else { - return false; - } - } catch (SQLException e) { - throw new JdbcClientException("failed to judge if table exist for table %s in db %s", e, tableName, dbName); - } finally { - close(rs, conn); - } - } - - @Data - private class JdbcFieldSchema { - private String columnName; - // The SQL type of the corresponding java.sql.types (Type ID) - private int dataType; - // The SQL type of the corresponding java.sql.types (Type Name) - private String dataTypeName; - // For CHAR/DATA, columnSize means the maximum number of chars. - // For NUMERIC/DECIMAL, columnSize means precision. - private int columnSize; - private int decimalDigits; - // Base number (usually 10 or 2) - private int numPrecRadix; - // column description - private String remarks; - // This length is the maximum number of bytes for CHAR type - // for utf8 encoding, if columnSize=10, then charOctetLength=30 - // because for utf8 encoding, a Chinese character takes up 3 bytes - private int charOctetLength; - private boolean isAllowNull; + }); + return isExist[0]; } /** @@ -418,53 +289,13 @@ public class JdbcClient { } try { DatabaseMetaData databaseMetaData = conn.getMetaData(); - String catalogName = conn.getCatalog(); - String modifiedTableName; - boolean isModify = false; - // getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) - // catalog - the catalog of this table, `null` means all catalogs - // schema - The schema of the table; corresponding to tablespace in Oracle - // `null` means get all schema; - // Can contain single-character wildcards ("_"), or multi-character wildcards ("%") - // tableNamePattern - table name - // Can contain single-character wildcards ("_"), or multi-character wildcards ("%") - // columnNamePattern - column name, `null` means get all columns - // Can contain single-character wildcards ("_"), or multi-character wildcards ("%") - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.OCEANBASE: - rs = databaseMetaData.getColumns(dbName, null, tableName, null); - break; - case JdbcResource.POSTGRESQL: - case JdbcResource.CLICKHOUSE: - case JdbcResource.SQLSERVER: - case JdbcResource.SAP_HANA: - case JdbcResource.OCEANBASE_ORACLE: - rs = databaseMetaData.getColumns(null, dbName, tableName, null); - break; - case JdbcResource.ORACLE: - modifiedTableName = tableName.replace("/", "%"); - if (!modifiedTableName.equals(tableName)) { - isModify = true; - } - rs = databaseMetaData.getColumns(null, dbName, modifiedTableName, null); - break; - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - rs = databaseMetaData.getColumns(catalogName, dbName, tableName, null); - break; - default: - throw new JdbcClientException("Unknown database type"); - } + String catalogName = getCatalogName(conn); + tableName = modifyTableNameIfNecessary(tableName); + rs = getColumns(databaseMetaData, catalogName, dbName, tableName); while (rs.next()) { - // for oracle special table name - if (isModify) { - String actualTableName = rs.getString("TABLE_NAME"); - if (!tableName.equals(actualTableName)) { - continue; - } + if (isTableModified(tableName, rs.getString("TABLE_NAME"))) { + continue; } - JdbcFieldSchema field = new JdbcFieldSchema(); field.setColumnName(rs.getString("COLUMN_NAME")); field.setDataType(rs.getInt("DATA_TYPE")); @@ -472,13 +303,13 @@ public class JdbcClient { field.setColumnSize(rs.getInt("COLUMN_SIZE")); field.setDecimalDigits(rs.getInt("DECIMAL_DIGITS")); field.setNumPrecRadix(rs.getInt("NUM_PREC_RADIX")); - /** - * Whether it is allowed to be NULL - * 0 (columnNoNulls) - * 1 (columnNullable) - * 2 (columnNullableUnknown) + /* + Whether it is allowed to be NULL + 0 (columnNoNulls) + 1 (columnNullable) + 2 (columnNullableUnknown) */ - field.setAllowNull(rs.getInt("NULLABLE") == 0 ? false : true); + field.setAllowNull(rs.getInt("NULLABLE") != 0); field.setRemarks(rs.getString("REMARKS")); field.setCharOctetLength(rs.getInt("CHAR_OCTET_LENGTH")); tableSchema.add(field); @@ -492,504 +323,6 @@ public class JdbcClient { return tableSchema; } - public Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { - switch (dbType) { - case JdbcResource.MYSQL: - case JdbcResource.OCEANBASE: - return mysqlTypeToDoris(fieldSchema); - case JdbcResource.POSTGRESQL: - return postgresqlTypeToDoris(fieldSchema); - case JdbcResource.CLICKHOUSE: - return clickhouseTypeToDoris(fieldSchema); - case JdbcResource.ORACLE: - case JdbcResource.OCEANBASE_ORACLE: - return oracleTypeToDoris(fieldSchema); - case JdbcResource.SQLSERVER: - return sqlserverTypeToDoris(fieldSchema); - case JdbcResource.SAP_HANA: - return saphanaTypeToDoris(fieldSchema); - case JdbcResource.TRINO: - case JdbcResource.PRESTO: - return trinoTypeToDoris(fieldSchema); - default: - throw new JdbcClientException("Unknown database type"); - } - } - - public Type mysqlTypeToDoris(JdbcFieldSchema fieldSchema) { - // For mysql type: "INT UNSIGNED": - // fieldSchema.getDataTypeName().split(" ")[0] == "INT" - // fieldSchema.getDataTypeName().split(" ")[1] == "UNSIGNED" - String[] typeFields = fieldSchema.getDataTypeName().split(" "); - String mysqlType = typeFields[0]; - // For unsigned int, should extend the type. - if (typeFields.length > 1 && "UNSIGNED".equals(typeFields[1])) { - switch (mysqlType) { - case "TINYINT": - return Type.SMALLINT; - case "SMALLINT": - return Type.INT; - case "MEDIUMINT": - return Type.INT; - case "INT": - return Type.BIGINT; - case "BIGINT": - return Type.LARGEINT; - case "DECIMAL": - int precision = fieldSchema.getColumnSize() + 1; - int scale = fieldSchema.getDecimalDigits(); - return createDecimalOrStringType(precision, scale); - case "DOUBLE": - // As of MySQL 8.0.17, the UNSIGNED attribute is deprecated - // for columns of type FLOAT, DOUBLE, and DECIMAL (and any synonyms) - // https://dev.mysql.com/doc/refman/8.0/en/numeric-type-syntax.html - // The maximum value may cause errors due to insufficient accuracy - return Type.DOUBLE; - case "FLOAT": - return Type.FLOAT; - default: - throw new JdbcClientException("Unknown UNSIGNED type of mysql, type: [" + mysqlType + "]"); - } - } - switch (mysqlType) { - case "BOOLEAN": - return Type.BOOLEAN; - case "TINYINT": - return Type.TINYINT; - case "SMALLINT": - case "YEAR": - return Type.SMALLINT; - case "MEDIUMINT": - case "INT": - return Type.INT; - case "BIGINT": - return Type.BIGINT; - case "LARGEINT": // for jdbc catalog connecting Doris database - return Type.LARGEINT; - case "DATE": - case "DATEV2": - return ScalarType.createDateV2Type(); - case "TIMESTAMP": - case "DATETIME": - case "DATETIMEV2": // for jdbc catalog connecting Doris database - // mysql can support microsecond - // todo(gaoxin): Get real precision of DATETIMEV2 - return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); - case "FLOAT": - return Type.FLOAT; - case "DOUBLE": - return Type.DOUBLE; - case "DECIMAL": - case "DECIMALV3": // for jdbc catalog connecting Doris database - int precision = fieldSchema.getColumnSize(); - int scale = fieldSchema.getDecimalDigits(); - return createDecimalOrStringType(precision, scale); - case "CHAR": - ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); - charType.setLength(fieldSchema.columnSize); - return charType; - case "VARCHAR": - return ScalarType.createVarcharType(fieldSchema.columnSize); - case "TIME": - case "TINYTEXT": - case "TEXT": - case "MEDIUMTEXT": - case "LONGTEXT": - case "TINYBLOB": - case "BLOB": - case "MEDIUMBLOB": - case "LONGBLOB": - case "TINYSTRING": - case "STRING": - case "MEDIUMSTRING": - case "LONGSTRING": - case "JSON": - case "SET": - case "BIT": - case "BINARY": - case "VARBINARY": - case "ENUM": - return ScalarType.createStringType(); - default: - return Type.UNSUPPORTED; - } - } - - public Type postgresqlTypeToDoris(JdbcFieldSchema fieldSchema) { - String pgType = fieldSchema.getDataTypeName(); - switch (pgType) { - case "int2": - case "smallserial": - return Type.SMALLINT; - case "int4": - case "serial": - return Type.INT; - case "int8": - case "bigserial": - return Type.BIGINT; - case "numeric": { - int precision = fieldSchema.getColumnSize(); - int scale = fieldSchema.getDecimalDigits(); - return createDecimalOrStringType(precision, scale); - } - case "float4": - return Type.FLOAT; - case "float8": - return Type.DOUBLE; - case "bpchar": - ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); - charType.setLength(fieldSchema.columnSize); - return charType; - case "timestamp": - case "timestamptz": - // postgres can support microsecond - return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); - case "date": - return ScalarType.createDateV2Type(); - case "bool": - return Type.BOOLEAN; - case "bit": - if (fieldSchema.getColumnSize() == 1) { - return Type.BOOLEAN; - } else { - return ScalarType.createStringType(); - } - case "point": - case "line": - case "lseg": - case "box": - case "path": - case "polygon": - case "circle": - case "varchar": - case "text": - case "time": - case "timetz": - case "interval": - case "cidr": - case "inet": - case "macaddr": - case "varbit": - case "json": - case "uuid": - case "bytea": - return ScalarType.createStringType(); - default: - return Type.UNSUPPORTED; - } - } - - public Type clickhouseTypeToDoris(JdbcFieldSchema fieldSchema) { - String ckType = fieldSchema.getDataTypeName(); - if (ckType.startsWith("LowCardinality")) { - ckType = ckType.substring(15, ckType.length() - 1); - if (ckType.startsWith("Nullable")) { - ckType = ckType.substring(9, ckType.length() - 1); - } - } else if (ckType.startsWith("Nullable")) { - ckType = ckType.substring(9, ckType.length() - 1); - } - if (ckType.startsWith("Decimal")) { - String[] accuracy = ckType.substring(8, ckType.length() - 1).split(", "); - int precision = Integer.parseInt(accuracy[0]); - int scale = Integer.parseInt(accuracy[1]); - return createDecimalOrStringType(precision, scale); - } else if ("String".contains(ckType) || ckType.startsWith("Enum") - || ckType.startsWith("IPv") || "UUID".contains(ckType) - || ckType.startsWith("FixedString")) { - return ScalarType.createStringType(); - } else if (ckType.startsWith("DateTime")) { - // DateTime with second precision, DateTime64 with [0~9] precision - if (ckType.equals("DateTime")) { - return ScalarType.createDatetimeV2Type(0); - } else { - // will lose precision - return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); - } - } else if (ckType.startsWith("Array")) { - String cktype = ckType.substring(6, ckType.length() - 1); - fieldSchema.setDataTypeName(cktype); - Type type = clickhouseTypeToDoris(fieldSchema); - return ArrayType.create(type, true); - } - switch (ckType) { - case "Bool": - return Type.BOOLEAN; - case "Int8": - return Type.TINYINT; - case "Int16": - case "UInt8": - return Type.SMALLINT; - case "Int32": - case "UInt16": - return Type.INT; - case "Int64": - case "UInt32": - return Type.BIGINT; - case "Int128": - case "UInt64": - return Type.LARGEINT; - case "Int256": - case "UInt128": - case "UInt256": - return ScalarType.createStringType(); - case "Float32": - return Type.FLOAT; - case "Float64": - return Type.DOUBLE; - case "Date": - case "Date32": - return ScalarType.createDateV2Type(); - default: - return Type.UNSUPPORTED; - } - } - - public Type oracleTypeToDoris(JdbcFieldSchema fieldSchema) { - String oracleType = fieldSchema.getDataTypeName(); - if (oracleType.startsWith("INTERVAL")) { - oracleType = oracleType.substring(0, 8); - } else if (oracleType.startsWith("TIMESTAMP")) { - if (oracleType.equals("TIMESTAMPTZ") || oracleType.equals("TIMESTAMPLTZ")) { - return Type.UNSUPPORTED; - } - // oracle can support nanosecond, will lose precision - return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); - } - switch (oracleType) { - /** - * The data type NUMBER(p,s) of oracle has some different of doris decimal type in semantics. - * For Oracle Number(p,s) type: - * 1. if s<0 , it means this is an Interger. - * This NUMBER(p,s) has (p+|s| ) significant digit, and rounding will be performed at s position. - * eg: if we insert 1234567 into NUMBER(5,-2) type, then the oracle will store 1234500. - * In this case, Doris will use INT type (TINYINT/SMALLINT/INT/.../LARGEINT). - * 2. if s>=0 && s

=0 && s>p, it means this is a decimal(like 0.xxxxx). - * p represents how many digits can be left to the left after the decimal point, - * the figure after the decimal point s will be rounded. - * eg: we can not insert 0.0123456 into NUMBER(5,7) type, - * because there must be two zeros on the right side of the decimal point, - * we can insert 0.0012345 into NUMBER(5,7) type. - * In this case, Doris will use DECIMAL(s,s) - * 4. if we don't specify p and s for NUMBER(p,s), just NUMBER, the p and s of NUMBER are uncertain. - * In this case, doris can not determine p and s, so doris can not determine data type. - */ - case "NUMBER": - int precision = fieldSchema.getColumnSize(); - int scale = fieldSchema.getDecimalDigits(); - if (scale <= 0) { - precision -= scale; - if (precision < 3) { - return Type.TINYINT; - } else if (precision < 5) { - return Type.SMALLINT; - } else if (precision < 10) { - return Type.INT; - } else if (precision < 19) { - return Type.BIGINT; - } else if (precision < 39) { - // LARGEINT supports up to 38 numbers. - return Type.LARGEINT; - } else { - return ScalarType.createStringType(); - } - } - // scale > 0 - if (precision < scale) { - precision = scale; - } - return createDecimalOrStringType(precision, scale); - case "FLOAT": - return Type.DOUBLE; - case "DATE": - // can save date and time with second precision - return ScalarType.createDatetimeV2Type(0); - case "VARCHAR2": - case "NVARCHAR2": - case "CHAR": - case "NCHAR": - case "LONG": - case "RAW": - case "LONG RAW": - case "INTERVAL": - return ScalarType.createStringType(); - case "BLOB": - case "CLOB": - case "NCLOB": - case "BFILE": - case "BINARY_FLOAT": - case "BINARY_DOUBLE": - default: - return Type.UNSUPPORTED; - } - } - - public Type sqlserverTypeToDoris(JdbcFieldSchema fieldSchema) { - String originSqlserverType = fieldSchema.getDataTypeName(); - // For sqlserver IDENTITY type, such as 'INT IDENTITY' - // originSqlserverType is "int identity", so we only get "int". - String sqlserverType = originSqlserverType.split(" ")[0]; - switch (sqlserverType) { - case "bit": - return Type.BOOLEAN; - case "tinyint": - case "smallint": - return Type.SMALLINT; - case "int": - return Type.INT; - case "bigint": - return Type.BIGINT; - case "real": - return Type.FLOAT; - case "float": - return Type.DOUBLE; - case "money": - return ScalarType.createDecimalV3Type(19, 4); - case "smallmoney": - return ScalarType.createDecimalV3Type(10, 4); - case "decimal": - case "numeric": - int precision = fieldSchema.getColumnSize(); - int scale = fieldSchema.getDecimalDigits(); - return ScalarType.createDecimalV3Type(precision, scale); - case "date": - return ScalarType.createDateV2Type(); - case "datetime": - // datetime with millisecond precision - return ScalarType.createDatetimeV2Type(3); - case "datetime2": - // datetime2 with 100 nanoseconds precision, will lose precision - return ScalarType.createDatetimeV2Type(6); - case "smalldatetime": - // smalldatetime with second precision - return ScalarType.createDatetimeV2Type(0); - case "char": - case "varchar": - case "nchar": - case "nvarchar": - case "text": - case "ntext": - case "time": - case "datetimeoffset": - return ScalarType.createStringType(); - case "image": - case "binary": - case "varbinary": - default: - return Type.UNSUPPORTED; - } - } - - public Type saphanaTypeToDoris(JdbcFieldSchema fieldSchema) { - String hanaType = fieldSchema.getDataTypeName(); - switch (hanaType) { - case "TINYINT": - return Type.TINYINT; - case "SMALLINT": - return Type.SMALLINT; - case "INTEGER": - return Type.INT; - case "BIGINT": - return Type.BIGINT; - case "SMALLDECIMAL": - case "DECIMAL": { - int precision = fieldSchema.getColumnSize(); - int scale = fieldSchema.getDecimalDigits(); - return createDecimalOrStringType(precision, scale); - } - case "REAL": - return Type.FLOAT; - case "DOUBLE": - return Type.DOUBLE; - case "TIMESTAMP": - // TIMESTAMP with 100 nanoseconds precision, will lose precision - return ScalarType.createDatetimeV2Type(6); - case "SECONDDATE": - // SECONDDATE with second precision - return ScalarType.createDatetimeV2Type(0); - case "DATE": - return ScalarType.createDateV2Type(); - case "BOOLEAN": - return Type.BOOLEAN; - case "CHAR": - case "NCHAR": - ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); - charType.setLength(fieldSchema.columnSize); - return charType; - case "TIME": - case "VARCHAR": - case "NVARCHAR": - case "ALPHANUM": - case "SHORTTEXT": - return ScalarType.createStringType(); - case "BINARY": - case "VARBINARY": - case "BLOB": - case "CLOB": - case "NCLOB": - case "TEXT": - case "BINTEXT": - case "ST_GEOMETRY": - case "ST_POINT": - default: - return Type.UNSUPPORTED; - } - } - - public Type trinoTypeToDoris(JdbcFieldSchema fieldSchema) { - String trinoType = fieldSchema.getDataTypeName(); - if (trinoType.startsWith("decimal")) { - String[] split = trinoType.split("\\("); - String[] precisionAndScale = split[1].split(","); - int precision = Integer.parseInt(precisionAndScale[0]); - int scale = Integer.parseInt(precisionAndScale[1].substring(0, precisionAndScale[1].length() - 1)); - return createDecimalOrStringType(precision, scale); - } else if (trinoType.startsWith("char")) { - ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); - charType.setLength(fieldSchema.columnSize); - return charType; - } else if (trinoType.startsWith("timestamp")) { - // timestamp with picoseconds precision, will lose precision - return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); - } else if (trinoType.startsWith("array")) { - String trinoArrType = trinoType.substring(6, trinoType.length() - 1); - fieldSchema.setDataTypeName(trinoArrType); - Type type = trinoTypeToDoris(fieldSchema); - return ArrayType.create(type, true); - } else if (trinoType.startsWith("varchar")) { - return ScalarType.createStringType(); - } - switch (trinoType) { - case "integer": - return Type.INT; - case "bigint": - return Type.BIGINT; - case "smallint": - return Type.SMALLINT; - case "tinyint": - return Type.TINYINT; - case "double": - return Type.DOUBLE; - case "real": - return Type.FLOAT; - case "boolean": - return Type.BOOLEAN; - case "date": - return ScalarType.createDateV2Type(); - default: - return Type.UNSUPPORTED; - } - } - - private Type createDecimalOrStringType(int precision, int scale) { - if (precision <= ScalarType.MAX_DECIMAL128_PRECISION) { - return ScalarType.createDecimalV3Type(precision, scale); - } - return ScalarType.createStringType(); - } - - public List getColumnsFromJdbc(String dbName, String tableName) { List jdbcTableSchema = getJdbcColumnsInfo(dbName, tableName); List dorisTableSchema = Lists.newArrayListWithCapacity(jdbcTableSchema.size()); @@ -1001,4 +334,87 @@ public class JdbcClient { } return dorisTableSchema; } + + // protected methods,for subclass to override + protected String getCatalogName(Connection conn) throws SQLException { + return null; + } + + protected abstract String getDatabaseQuery(); + + protected List getSpecifiedDatabase(Connection conn) { + List databaseNames = Lists.newArrayList(); + try { + databaseNames.add(conn.getSchema()); + } catch (SQLException e) { + throw new JdbcClientException("failed to get specified database name from jdbc", e); + } finally { + close(conn); + } + return databaseNames; + } + + protected String[] getTableTypes() { + return new String[] {"TABLE", "VIEW"}; + } + + protected void processTable(String dbName, String tableName, String[] tableTypes, + Consumer resultSetConsumer) { + Connection conn = getConnection(); + ResultSet rs = null; + try { + DatabaseMetaData databaseMetaData = conn.getMetaData(); + String catalogName = getCatalogName(conn); + rs = databaseMetaData.getTables(catalogName, dbName, tableName, tableTypes); + resultSetConsumer.accept(rs); + } catch (SQLException e) { + throw new JdbcClientException("Failed to process table", e); + } finally { + close(rs, conn); + } + } + + protected String modifyTableNameIfNecessary(String tableName) { + return tableName; + } + + protected boolean isTableModified(String modifiedTableName, String actualTableName) { + return false; + } + + protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName, + String tableName) throws SQLException { + return databaseMetaData.getColumns(catalogName, schemaName, tableName, null); + } + + @Data + protected static class JdbcFieldSchema { + protected String columnName; + // The SQL type of the corresponding java.sql.types (Type ID) + protected int dataType; + // The SQL type of the corresponding java.sql.types (Type Name) + protected String dataTypeName; + // For CHAR/DATA, columnSize means the maximum number of chars. + // For NUMERIC/DECIMAL, columnSize means precision. + protected int columnSize; + protected int decimalDigits; + // Base number (usually 10 or 2) + protected int numPrecRadix; + // column description + protected String remarks; + // This length is the maximum number of bytes for CHAR type + // for utf8 encoding, if columnSize=10, then charOctetLength=30 + // because for utf8 encoding, a Chinese character takes up 3 bytes + protected int charOctetLength; + protected boolean isAllowNull; + } + + protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema); + + protected Type createDecimalOrStringType(int precision, int scale) { + if (precision <= ScalarType.MAX_DECIMAL128_PRECISION) { + return ScalarType.createDecimalV3Type(precision, scale); + } + return ScalarType.createStringType(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClientConfig.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClientConfig.java new file mode 100644 index 0000000000..787943b5f7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClientConfig.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.external.jdbc; + +import java.util.Map; + +public class JdbcClientConfig { + private String user; + private String password; + private String jdbcUrl; + private String driverUrl; + private String driverClass; + private String onlySpecifiedDatabase; + private String isLowerCaseTableNames; + private Map includeDatabaseMap; + private Map excludeDatabaseMap; + + public String getUser() { + return user; + } + + public JdbcClientConfig setUser(String user) { + this.user = user; + return this; + } + + public String getPassword() { + return password; + } + + public JdbcClientConfig setPassword(String password) { + this.password = password; + return this; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public JdbcClientConfig setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + return this; + } + + public String getDriverUrl() { + return driverUrl; + } + + public JdbcClientConfig setDriverUrl(String driverUrl) { + this.driverUrl = driverUrl; + return this; + } + + public String getDriverClass() { + return driverClass; + } + + public JdbcClientConfig setDriverClass(String driverClass) { + this.driverClass = driverClass; + return this; + } + + public String getOnlySpecifiedDatabase() { + return onlySpecifiedDatabase; + } + + public JdbcClientConfig setOnlySpecifiedDatabase(String onlySpecifiedDatabase) { + this.onlySpecifiedDatabase = onlySpecifiedDatabase; + return this; + } + + public String getIsLowerCaseTableNames() { + return isLowerCaseTableNames; + } + + public JdbcClientConfig setIsLowerCaseTableNames(String isLowerCaseTableNames) { + this.isLowerCaseTableNames = isLowerCaseTableNames; + return this; + } + + public Map getIncludeDatabaseMap() { + return includeDatabaseMap; + } + + public JdbcClientConfig setIncludeDatabaseMap(Map includeDatabaseMap) { + this.includeDatabaseMap = includeDatabaseMap; + return this; + } + + public Map getExcludeDatabaseMap() { + return excludeDatabaseMap; + } + + public JdbcClientConfig setExcludeDatabaseMap(Map excludeDatabaseMap) { + this.excludeDatabaseMap = excludeDatabaseMap; + return this; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java new file mode 100644 index 0000000000..d3c088cca7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcMySQLClient.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +import avro.shaded.com.google.common.collect.Lists; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.function.Consumer; + +public class JdbcMySQLClient extends JdbcClient { + protected JdbcMySQLClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SHOW DATABASES"; + } + + @Override + protected List getSpecifiedDatabase(Connection conn) { + List databaseNames = Lists.newArrayList(); + try { + databaseNames.add(conn.getCatalog()); + } catch (SQLException e) { + throw new JdbcClientException("failed to get specified database name from jdbc", e); + } finally { + close(conn); + } + return databaseNames; + } + + @Override + protected void processTable(String dbName, String tableName, String[] tableTypes, + Consumer resultSetConsumer) { + Connection conn = null; + ResultSet rs = null; + try { + conn = super.getConnection(); + DatabaseMetaData databaseMetaData = conn.getMetaData(); + rs = databaseMetaData.getTables(dbName, null, tableName, tableTypes); + resultSetConsumer.accept(rs); + } catch (SQLException e) { + throw new JdbcClientException("Failed to process table", e); + } finally { + close(rs, conn); + } + } + + @Override + protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName, + String tableName) throws SQLException { + return databaseMetaData.getColumns(schemaName, null, tableName, null); + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + // For mysql type: "INT UNSIGNED": + // fieldSchema.getDataTypeName().split(" ")[0] == "INT" + // fieldSchema.getDataTypeName().split(" ")[1] == "UNSIGNED" + String[] typeFields = fieldSchema.getDataTypeName().split(" "); + String mysqlType = typeFields[0]; + // For unsigned int, should extend the type. + if (typeFields.length > 1 && "UNSIGNED".equals(typeFields[1])) { + switch (mysqlType) { + case "TINYINT": + return Type.SMALLINT; + case "SMALLINT": + case "MEDIUMINT": + return Type.INT; + case "INT": + return Type.BIGINT; + case "BIGINT": + return Type.LARGEINT; + case "DECIMAL": + int precision = fieldSchema.getColumnSize() + 1; + int scale = fieldSchema.getDecimalDigits(); + return createDecimalOrStringType(precision, scale); + case "DOUBLE": + // As of MySQL 8.0.17, the UNSIGNED attribute is deprecated + // for columns of type FLOAT, DOUBLE, and DECIMAL (and any synonyms) + // https://dev.mysql.com/doc/refman/8.0/en/numeric-type-syntax.html + // The maximum value may cause errors due to insufficient accuracy + return Type.DOUBLE; + case "FLOAT": + return Type.FLOAT; + default: + throw new JdbcClientException("Unknown UNSIGNED type of mysql, type: [" + mysqlType + "]"); + } + } + switch (mysqlType) { + case "BOOLEAN": + return Type.BOOLEAN; + case "TINYINT": + return Type.TINYINT; + case "SMALLINT": + case "YEAR": + return Type.SMALLINT; + case "MEDIUMINT": + case "INT": + return Type.INT; + case "BIGINT": + return Type.BIGINT; + case "LARGEINT": // for jdbc catalog connecting Doris database + return Type.LARGEINT; + case "DATE": + case "DATEV2": + return ScalarType.createDateV2Type(); + case "TIMESTAMP": + case "DATETIME": + case "DATETIMEV2": // for jdbc catalog connecting Doris database + // mysql can support microsecond + // todo(gaoxin): Get real precision of DATETIMEV2 + return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); + case "FLOAT": + return Type.FLOAT; + case "DOUBLE": + return Type.DOUBLE; + case "DECIMAL": + case "DECIMALV3": // for jdbc catalog connecting Doris database + int precision = fieldSchema.getColumnSize(); + int scale = fieldSchema.getDecimalDigits(); + return createDecimalOrStringType(precision, scale); + case "CHAR": + ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); + charType.setLength(fieldSchema.columnSize); + return charType; + case "VARCHAR": + return ScalarType.createVarcharType(fieldSchema.columnSize); + case "TIME": + case "TINYTEXT": + case "TEXT": + case "MEDIUMTEXT": + case "LONGTEXT": + case "TINYBLOB": + case "BLOB": + case "MEDIUMBLOB": + case "LONGBLOB": + case "TINYSTRING": + case "STRING": + case "MEDIUMSTRING": + case "LONGSTRING": + case "JSON": + case "SET": + case "BIT": + case "BINARY": + case "VARBINARY": + case "ENUM": + return ScalarType.createStringType(); + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOceanBaseClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOceanBaseClient.java new file mode 100644 index 0000000000..70a594f855 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOceanBaseClient.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.catalog.Type; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public class JdbcOceanBaseClient extends JdbcClient { + private JdbcClient currentClient; + + public JdbcOceanBaseClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + + try { + conn = super.getConnection(); + stmt = conn.createStatement(); + rs = stmt.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'"); + if (rs.next()) { + String compatibilityMode = rs.getString(2); + if ("MYSQL".equalsIgnoreCase(compatibilityMode)) { + currentClient = new JdbcMySQLClient(jdbcClientConfig); + } else if ("ORACLE".equalsIgnoreCase(compatibilityMode)) { + currentClient = new JdbcOracleClient(jdbcClientConfig); + setOracleMode(); + } else { + throw new JdbcClientException("Unsupported compatibility mode: " + compatibilityMode); + } + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to determine OceanBase compatibility mode", e); + } finally { + close(rs, stmt, conn); + } + } + + @Override + protected String getDatabaseQuery() { + return currentClient.getDatabaseQuery(); + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + return currentClient.jdbcTypeToDoris(fieldSchema); + } + + public void setOracleMode() { + this.dbType = JdbcResource.OCEANBASE_ORACLE; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOracleClient.java new file mode 100644 index 0000000000..5adfa63b9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcOracleClient.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +public class JdbcOracleClient extends JdbcClient { + + protected JdbcOracleClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SELECT DISTINCT OWNER FROM all_tables"; + } + + @Override + protected String modifyTableNameIfNecessary(String tableName) { + return tableName.replace("/", "%"); + } + + @Override + protected boolean isTableModified(String modifiedTableName, String actualTableName) { + return !modifiedTableName.equals(actualTableName); + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + String oracleType = fieldSchema.getDataTypeName(); + if (oracleType.startsWith("INTERVAL")) { + oracleType = oracleType.substring(0, 8); + } else if (oracleType.startsWith("TIMESTAMP")) { + if (oracleType.equals("TIMESTAMPTZ") || oracleType.equals("TIMESTAMPLTZ")) { + return Type.UNSUPPORTED; + } + // oracle can support nanosecond, will lose precision + return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); + } + switch (oracleType) { + /** + * The data type NUMBER(p,s) of oracle has some different of doris decimal type in semantics. + * For Oracle Number(p,s) type: + * 1. if s<0 , it means this is an Interger. + * This NUMBER(p,s) has (p+|s| ) significant digit, and rounding will be performed at s position. + * eg: if we insert 1234567 into NUMBER(5,-2) type, then the oracle will store 1234500. + * In this case, Doris will use INT type (TINYINT/SMALLINT/INT/.../LARGEINT). + * 2. if s>=0 && s

=0 && s>p, it means this is a decimal(like 0.xxxxx). + * p represents how many digits can be left to the left after the decimal point, + * the figure after the decimal point s will be rounded. + * eg: we can not insert 0.0123456 into NUMBER(5,7) type, + * because there must be two zeros on the right side of the decimal point, + * we can insert 0.0012345 into NUMBER(5,7) type. + * In this case, Doris will use DECIMAL(s,s) + * 4. if we don't specify p and s for NUMBER(p,s), just NUMBER, the p and s of NUMBER are uncertain. + * In this case, doris can not determine p and s, so doris can not determine data type. + */ + case "NUMBER": + int precision = fieldSchema.getColumnSize(); + int scale = fieldSchema.getDecimalDigits(); + if (scale <= 0) { + precision -= scale; + if (precision < 3) { + return Type.TINYINT; + } else if (precision < 5) { + return Type.SMALLINT; + } else if (precision < 10) { + return Type.INT; + } else if (precision < 19) { + return Type.BIGINT; + } else if (precision < 39) { + // LARGEINT supports up to 38 numbers. + return Type.LARGEINT; + } else { + return ScalarType.createStringType(); + } + } + // scale > 0 + if (precision < scale) { + precision = scale; + } + return createDecimalOrStringType(precision, scale); + case "FLOAT": + return Type.DOUBLE; + case "DATE": + // can save date and time with second precision + return ScalarType.createDatetimeV2Type(0); + case "VARCHAR2": + case "NVARCHAR2": + case "CHAR": + case "NCHAR": + case "LONG": + case "RAW": + case "LONG RAW": + case "INTERVAL": + return ScalarType.createStringType(); + case "BLOB": + case "CLOB": + case "NCLOB": + case "BFILE": + case "BINARY_FLOAT": + case "BINARY_DOUBLE": + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcPostgreSQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcPostgreSQLClient.java new file mode 100644 index 0000000000..26eba627f4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcPostgreSQLClient.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +public class JdbcPostgreSQLClient extends JdbcClient { + + protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SELECT nspname FROM pg_namespace WHERE has_schema_privilege(" + + "'" + jdbcUser + "', nspname, 'USAGE');"; + } + + @Override + protected String[] getTableTypes() { + return new String[] {"TABLE", "VIEW", "MATERIALIZED VIEW", "FOREIGN TABLE"}; + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + String pgType = fieldSchema.getDataTypeName(); + switch (pgType) { + case "int2": + case "smallserial": + return Type.SMALLINT; + case "int4": + case "serial": + return Type.INT; + case "int8": + case "bigserial": + return Type.BIGINT; + case "numeric": { + int precision = fieldSchema.getColumnSize(); + int scale = fieldSchema.getDecimalDigits(); + return createDecimalOrStringType(precision, scale); + } + case "float4": + return Type.FLOAT; + case "float8": + return Type.DOUBLE; + case "bpchar": + ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); + charType.setLength(fieldSchema.columnSize); + return charType; + case "timestamp": + case "timestamptz": + // postgres can support microsecond + return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); + case "date": + return ScalarType.createDateV2Type(); + case "bool": + return Type.BOOLEAN; + case "bit": + if (fieldSchema.getColumnSize() == 1) { + return Type.BOOLEAN; + } else { + return ScalarType.createStringType(); + } + case "point": + case "line": + case "lseg": + case "box": + case "path": + case "polygon": + case "circle": + case "varchar": + case "text": + case "time": + case "timetz": + case "interval": + case "cidr": + case "inet": + case "macaddr": + case "varbit": + case "jsonb": + case "uuid": + case "bytea": + return ScalarType.createStringType(); + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSQLServerClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSQLServerClient.java new file mode 100644 index 0000000000..6fe737634e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSQLServerClient.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +public class JdbcSQLServerClient extends JdbcClient { + + protected JdbcSQLServerClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SELECT name FROM sys.schemas"; + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + String originSqlserverType = fieldSchema.getDataTypeName(); + // For sqlserver IDENTITY type, such as 'INT IDENTITY' + // originSqlserverType is "int identity", so we only get "int". + String sqlserverType = originSqlserverType.split(" ")[0]; + switch (sqlserverType) { + case "bit": + return Type.BOOLEAN; + case "tinyint": + case "smallint": + return Type.SMALLINT; + case "int": + return Type.INT; + case "bigint": + return Type.BIGINT; + case "real": + return Type.FLOAT; + case "float": + return Type.DOUBLE; + case "money": + return ScalarType.createDecimalV3Type(19, 4); + case "smallmoney": + return ScalarType.createDecimalV3Type(10, 4); + case "decimal": + case "numeric": + int precision = fieldSchema.getColumnSize(); + int scale = fieldSchema.getDecimalDigits(); + return ScalarType.createDecimalV3Type(precision, scale); + case "date": + return ScalarType.createDateV2Type(); + case "datetime": + // datetime with millisecond precision + return ScalarType.createDatetimeV2Type(3); + case "datetime2": + // datetime2 with 100 nanoseconds precision, will lose precision + return ScalarType.createDatetimeV2Type(6); + case "smalldatetime": + // smalldatetime with second precision + return ScalarType.createDatetimeV2Type(0); + case "char": + case "varchar": + case "nchar": + case "nvarchar": + case "text": + case "ntext": + case "time": + case "datetimeoffset": + return ScalarType.createStringType(); + case "image": + case "binary": + case "varbinary": + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSapHanaClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSapHanaClient.java new file mode 100644 index 0000000000..bbd322675e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcSapHanaClient.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +public class JdbcSapHanaClient extends JdbcClient { + protected JdbcSapHanaClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SELECT SCHEMA_NAME FROM SYS.SCHEMAS WHERE HAS_PRIVILEGES = 'TRUE'"; + } + + @Override + protected String[] getTableTypes() { + return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + String hanaType = fieldSchema.getDataTypeName(); + switch (hanaType) { + case "TINYINT": + return Type.TINYINT; + case "SMALLINT": + return Type.SMALLINT; + case "INTEGER": + return Type.INT; + case "BIGINT": + return Type.BIGINT; + case "SMALLDECIMAL": + case "DECIMAL": { + int precision = fieldSchema.getColumnSize(); + int scale = fieldSchema.getDecimalDigits(); + return createDecimalOrStringType(precision, scale); + } + case "REAL": + return Type.FLOAT; + case "DOUBLE": + return Type.DOUBLE; + case "TIMESTAMP": + // TIMESTAMP with 100 nanoseconds precision, will lose precision + return ScalarType.createDatetimeV2Type(6); + case "SECONDDATE": + // SECONDDATE with second precision + return ScalarType.createDatetimeV2Type(0); + case "DATE": + return ScalarType.createDateV2Type(); + case "BOOLEAN": + return Type.BOOLEAN; + case "CHAR": + case "NCHAR": + ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); + charType.setLength(fieldSchema.columnSize); + return charType; + case "TIME": + case "VARCHAR": + case "NVARCHAR": + case "ALPHANUM": + case "SHORTTEXT": + return ScalarType.createStringType(); + case "BINARY": + case "VARBINARY": + case "BLOB": + case "CLOB": + case "NCLOB": + case "TEXT": + case "BINTEXT": + case "ST_GEOMETRY": + case "ST_POINT": + default: + return Type.UNSUPPORTED; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcTrinoClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcTrinoClient.java new file mode 100644 index 0000000000..f971fc540c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcTrinoClient.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.jdbc; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +import java.sql.Connection; +import java.sql.SQLException; + +public class JdbcTrinoClient extends JdbcClient { + protected JdbcTrinoClient(JdbcClientConfig jdbcClientConfig) { + super(jdbcClientConfig); + } + + @Override + protected String getDatabaseQuery() { + return "SHOW SCHEMAS"; + } + + @Override + protected String getCatalogName(Connection conn) throws SQLException { + return conn.getCatalog(); + } + + @Override + protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { + String trinoType = fieldSchema.getDataTypeName(); + switch (trinoType) { + case "integer": + return Type.INT; + case "bigint": + return Type.BIGINT; + case "smallint": + return Type.SMALLINT; + case "tinyint": + return Type.TINYINT; + case "double": + return Type.DOUBLE; + case "real": + return Type.FLOAT; + case "boolean": + return Type.BOOLEAN; + case "date": + return ScalarType.createDateV2Type(); + default: + break; + } + + if (trinoType.startsWith("decimal")) { + String[] split = trinoType.split("\\("); + String[] precisionAndScale = split[1].split(","); + int precision = Integer.parseInt(precisionAndScale[0]); + int scale = Integer.parseInt(precisionAndScale[1].substring(0, precisionAndScale[1].length() - 1)); + return createDecimalOrStringType(precision, scale); + } + + if (trinoType.startsWith("char")) { + ScalarType charType = ScalarType.createType(PrimitiveType.CHAR); + charType.setLength(fieldSchema.columnSize); + return charType; + } + + if (trinoType.startsWith("timestamp")) { + // timestamp with picoseconds precision, will lose precision + return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE); + } + + if (trinoType.startsWith("array")) { + String trinoArrType = trinoType.substring(6, trinoType.length() - 1); + fieldSchema.setDataTypeName(trinoArrType); + Type type = jdbcTypeToDoris(fieldSchema); + return ArrayType.create(type, true); + } + + if (trinoType.startsWith("varchar")) { + return ScalarType.createStringType(); + } + + return Type.UNSUPPORTED; + } +} diff --git a/regression-test/data/jdbc_catalog_p0/test_clickhouse_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_clickhouse_jdbc_catalog.out index 2864fb423534f4de61bd4b186e2f2502a9b6c930..540500dbef3bbc26033b4fdd6cf8e922778c62bb 100644 GIT binary patch delta 134 zcmdnRy@h+jN)`_#L#0^GXhSp%5rvROdPWAZoY6`dsX00MAjL}hhK6Q(MyAGkCI)(j u#>QZ=$sbr`SRpJyRuyRw3uLHLaY<2TUOI>iV@?iW6`LHuDmS^CRUZK6Ng{Lr delta 137 zcmdnOy^DLpN)~r@L-knBXhSp%5rvROdPcFF(drqgIXU?twd(qYhGu$3rp9_E26~3Z x#$d6@A6aBrAuIt#6=@I)WTtv?Nl|8AI*1EnP7Y)hn_SDvF*%4;baE}LIRI|6BdP!Z diff --git a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out index 62fb42f033..8cb2b59185 100644 --- a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out @@ -185,11 +185,7 @@ true abcHa1.12345 1.123450xkalowadawd 2022-10-01 3.14159 1 2 0 100000 1.2345678 doris_test -- !specified_database_2 -- -information_schema -init_db -mysql -performance_schema -sys +doris_test -- !specified_database_3 -- information_schema @@ -199,11 +195,6 @@ performance_schema sys -- !specified_database_4 -- -information_schema -init_db -mysql -performance_schema -sys -- !ex_tb1 -- {"k1":"v1", "k2":"v2"} diff --git a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out index 781befc521..b9326d87f9 100644 --- a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out @@ -2161,11 +2161,7 @@ doris3 20 doris_test -- !specified_database_2 -- -catalog_pg_test -information_schema -pg_catalog -pg_toast -public +doris_test -- !specified_database_3 -- catalog_pg_test @@ -2175,11 +2171,6 @@ pg_toast public -- !specified_database_4 -- -catalog_pg_test -information_schema -pg_catalog -pg_toast -public -- !test_old -- 123 abc