[refactor](jdbc catalog) Refactor the JdbcClient code (#20109)

This PR does the following:

1. This PR is a substantial refactor of the JDBC client architecture. The previous monolithic JDBC client has been refactored into an abstract base class `JdbcClient`, and a set of database-specific subclasses (e.g., `JdbcMySQLClient`, `JdbcOracleClient`, etc.), and the JdbcClient required config, abstract into an object. This allows for improved modularity, easier addition of support for new databases, and cleaner, more maintainable code. This change is backward-compatible and does not affect existing functionality.
2. As a result of client refactoring, OceanBaseClient can automatically recognize the mode of operation as MySQL or Oracle, so we cancel the oceanbase_mode property in the Jdbc Catalog, but due to the cancellation of the property, When creating a single OceanBase Jdbc Table, the table type needs to be filled in as oceanbase(mysql mode) or oceanbase_oracle(oracle_mode). The above work is a change in the usage behavior, please note.
3. For the PostgreSQL Jdbc Catalog, I did two things:

      1.   The adaptation to MATERIALIZED VIEW and FOREIGN TABLE is added
      2.   Fixed reading jsonb, which had been incorrectly changed to json in a previous PR

4. fix some jdbc catalog test case
5. modify oceanbase jdbc doc

And,Thanks @wolfboys for the guidance
This commit is contained in:
zy-kkk
2023-06-02 17:58:10 +08:00
committed by GitHub
parent c2121c831a
commit a20a6d2bea
22 changed files with 1242 additions and 885 deletions

View File

@ -251,10 +251,10 @@ properties (
"password"="",
"jdbc_url" = "jdbc:oceanbase://localhost:2881/test",
"driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql" or "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
);
mysql mode
CREATE EXTERNAL TABLE `ext_oceanbase` (
`k1` int
) ENGINE=JDBC
@ -263,6 +263,16 @@ PROPERTIES (
"table" = "test.test",
"table_type"="oceanbase"
);
oracle mode
CREATE EXTERNAL TABLE `ext_oceanbase` (
`k1` int
) ENGINE=JDBC
PROPERTIES (
"resource" = "jdbc_oceanbase",
"table" = "test.test",
"table_type"="oceanbase_oracle"
);
```
### 9.NebulaGraphTest (only supports queries)

View File

@ -214,18 +214,7 @@ CREATE CATALOG jdbc_oceanbase_mysql PROPERTIES (
"password"="123456",
"jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo",
"driver_url" = "oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql"
)
CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo",
"driver_url" = "oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
)
```
@ -240,9 +229,9 @@ CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES (
| `driver_class ` | Yes | | JDBC Driver Class |
| `only_specified_database` | No | "false" | Whether only the database specified to be synchronized. |
| `lower_case_table_names` | No | "false" | Whether to synchronize jdbc external data source table names in lower case. |
| `oceanbase_mode` | No | "" | When the connected external data source is OceanBase, the mode must be specified as mysql or oracle |
| `include_database_list` | No | "" | When only_specified_database=true,only synchronize the specified databases. split with ','. db name is case sensitive. |
| `exclude_database_list` | No | "" | When only_specified_database=true,do not synchronize the specified databases. split with ','. db name is case sensitive. |
> `driver_url` can be specified in three ways:
>
> 1. File name. For example, `mysql-connector-java-5.1.47.jar`. Please place the Jar file package in `jdbc_drivers/` under the FE/BE deployment directory in advance so the system can locate the file. You can change the location of the file by modifying `jdbc_drivers_dir` in fe.conf and be.conf.

View File

@ -163,8 +163,7 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name
"password"="",
"jdbc_url" = "jdbc:oceanbase://localhost:2881/demo",
"driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql" or "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
);
```

View File

@ -245,10 +245,10 @@ properties (
"password"="",
"jdbc_url" = "jdbc:oceanbase://localhost:2881/test",
"driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql" or "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
);
mysql模式
CREATE EXTERNAL TABLE `ext_oceanbase` (
`k1` int
) ENGINE=JDBC
@ -257,6 +257,16 @@ PROPERTIES (
"table" = "test.test",
"table_type"="oceanbase"
);
oracle模式
CREATE EXTERNAL TABLE `ext_oceanbase` (
`k1` int
) ENGINE=JDBC
PROPERTIES (
"resource" = "jdbc_oceanbase",
"table" = "test.test",
"table_type"="oceanbase_oracle"
);
```
### 9.Nebula-graph测试 (仅支持查询)

View File

@ -215,18 +215,7 @@ CREATE CATALOG jdbc_oceanbase_mysql PROPERTIES (
"password"="123456",
"jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo",
"driver_url" = "oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql"
)
CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:oceanbase://127.0.0.1:2881/demo",
"driver_url" = "oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
)
```
@ -241,7 +230,6 @@ CREATE CATALOG jdbc_oceanbase_oracle PROPERTIES (
| `driver_class` | 是 | | JDBC Driver Class 名称 |
| `only_specified_database` | 否 | "false" | 指定是否只同步指定的 database |
| `lower_case_table_names` | 否 | "false" | 是否以小写的形式同步jdbc外部数据源的表名 |
| `oceanbase_mode` | 否 | "" | 当连接的外部数据源为OceanBase时,必须为其指定模式为mysql或oracle |
| `include_database_list` | 否 | "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 |
| `exclude_database_list` | 否 | "" | 当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。|

View File

@ -167,8 +167,7 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name
"password"="",
"jdbc_url" = "jdbc:oceanbase://localhost:2881/demo",
"driver_url" = "file:///path/to/oceanbase-client-2.4.2.jar",
"driver_class" = "com.oceanbase.jdbc.Driver",
"oceanbase_mode" = "mysql" or "oracle"
"driver_class" = "com.oceanbase.jdbc.Driver"
);
```

View File

@ -94,7 +94,6 @@ public class JdbcResource extends Resource {
public static final String TYPE = "type";
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
public static final String OCEANBASE_MODE = "oceanbase_mode";
public static final String CHECK_SUM = "checksum";
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
@ -105,14 +104,12 @@ public class JdbcResource extends Resource {
TYPE,
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
OCEANBASE_MODE,
INCLUDE_DATABASE_LIST,
EXCLUDE_DATABASE_LIST
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
OCEANBASE_MODE,
INCLUDE_DATABASE_LIST,
EXCLUDE_DATABASE_LIST
).build();
@ -124,7 +121,6 @@ public class JdbcResource extends Resource {
static {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(OCEANBASE_MODE, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
}
@ -153,7 +149,7 @@ public class JdbcResource extends Resource {
for (String propertyKey : ALL_PROPERTIES) {
replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey));
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE)));
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
super.modifyProperties(properties);
}
@ -186,7 +182,7 @@ public class JdbcResource extends Resource {
throw new DdlException("JdbcResource Missing " + property + " in properties");
}
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE)));
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
}
@ -269,7 +265,7 @@ public class JdbcResource extends Resource {
}
}
public static String parseDbType(String url, String oceanbaseMode) throws DdlException {
public static String parseDbType(String url) throws DdlException {
if (url.startsWith(JDBC_MYSQL) || url.startsWith(JDBC_MARIADB)) {
return MYSQL;
} else if (url.startsWith(JDBC_POSTGRESQL)) {
@ -287,27 +283,17 @@ public class JdbcResource extends Resource {
} else if (url.startsWith(JDBC_PRESTO)) {
return PRESTO;
} else if (url.startsWith(JDBC_OCEANBASE)) {
if (oceanbaseMode == null || oceanbaseMode.isEmpty()) {
throw new DdlException("OceanBase mode must be specified for OceanBase databases"
+ "(either 'mysql' or 'oracle')");
}
if (oceanbaseMode.equalsIgnoreCase("mysql")) {
return OCEANBASE;
} else if (oceanbaseMode.equalsIgnoreCase("oracle")) {
return OCEANBASE_ORACLE;
} else {
throw new DdlException("Invalid OceanBase mode: " + oceanbaseMode + ". Must be 'mysql' or 'oracle'");
}
return OCEANBASE;
} else if (url.startsWith(JDBC_NEBULA)) {
return NEBULA;
}
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
}
public static String handleJdbcUrl(String jdbcUrl, String oceanbaseMode) throws DdlException {
public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
// delete all space in jdbcUrl
String newJdbcUrl = jdbcUrl.replaceAll(" ", "");
String dbType = parseDbType(newJdbcUrl, oceanbaseMode);
String dbType = parseDbType(newJdbcUrl);
if (dbType.equals(MYSQL) || dbType.equals(OCEANBASE)) {
// `yearIsDateType` is a parameter of JDBC, and the default is true.
// We force the use of `yearIsDateType=false`

View File

@ -39,7 +39,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -287,10 +286,7 @@ public class JdbcTable extends Table {
throw new DdlException("property " + TABLE_TYPE + " must be set");
}
Map<String, TOdbcTableType> tableTypeMapWithoutOceanbaseOracle = new HashMap<>(TABLE_TYPE_MAP);
tableTypeMapWithoutOceanbaseOracle.remove("oceanbase_oracle");
if (!tableTypeMapWithoutOceanbaseOracle.containsKey(jdbcTypeName.toLowerCase())) {
if (!TABLE_TYPE_MAP.containsKey(jdbcTypeName.toLowerCase())) {
throw new DdlException("Unknown jdbc table type: " + jdbcTypeName);
}
@ -310,21 +306,10 @@ public class JdbcTable extends Table {
driverUrl = jdbcResource.getProperty(DRIVER_URL);
checkSum = jdbcResource.getProperty(CHECK_SUM);
if (!jdbcTypeName.equalsIgnoreCase(jdbcUrl.split(":")[1])) {
throw new DdlException("property " + TABLE_TYPE + " must be same with resource url");
}
// get oceanbase_mode
String oceanbaseMode = jdbcResource.getProperty("oceanbase_mode");
// by oceanbase_mode set jdbcTypeName
if ("oceanbase".equalsIgnoreCase(jdbcTypeName)) {
if ("mysql".equalsIgnoreCase(oceanbaseMode)) {
jdbcTypeName = "oceanbase";
} else if ("oracle".equalsIgnoreCase(oceanbaseMode)) {
jdbcTypeName = "oceanbase_oracle";
} else {
throw new DdlException("Unknown oceanbase_mode: " + oceanbaseMode);
String urlType = jdbcUrl.split(":")[1];
if (!jdbcTypeName.equalsIgnoreCase(urlType)) {
if (!(jdbcTypeName.equalsIgnoreCase("oceanbase_oracle") && urlType.equalsIgnoreCase("oceanbase"))) {
throw new DdlException("property " + TABLE_TYPE + " must be same with resource url");
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.common.DdlException;
import org.apache.doris.external.jdbc.JdbcClient;
import org.apache.doris.external.jdbc.JdbcClientConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -78,9 +79,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
properties.put(StringUtils.removeStart(kv.getKey(), JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue());
}
String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
String oceanbaseMode = properties.getOrDefault(JdbcResource.OCEANBASE_MODE, "");
if (!Strings.isNullOrEmpty(jdbcUrl)) {
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl, oceanbaseMode);
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
properties.put(JdbcResource.JDBC_URL, jdbcUrl);
}
@ -127,15 +127,20 @@ public class JdbcExternalCatalog extends ExternalCatalog {
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
}
public String getOceanBaseMode() {
return catalogProperty.getOrDefault(JdbcResource.OCEANBASE_MODE, "");
}
@Override
protected void initLocalObjectsImpl() {
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(),
getDriverClass(), getOnlySpecifiedDatabase(), getLowerCaseTableNames(),
getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap());
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig()
.setUser(getJdbcUser())
.setPassword(getJdbcPasswd())
.setJdbcUrl(getJdbcUrl())
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseTableNames(getLowerCaseTableNames())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap());
jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
}
protected List<String> listDatabaseNames() {

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcClickHouseClient extends JdbcClient {
protected JdbcClickHouseClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SHOW DATABASES";
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String ckType = fieldSchema.getDataTypeName();
if (ckType.startsWith("LowCardinality")) {
ckType = ckType.substring(15, ckType.length() - 1);
if (ckType.startsWith("Nullable")) {
ckType = ckType.substring(9, ckType.length() - 1);
}
} else if (ckType.startsWith("Nullable")) {
ckType = ckType.substring(9, ckType.length() - 1);
}
if (ckType.startsWith("Decimal")) {
String[] accuracy = ckType.substring(8, ckType.length() - 1).split(", ");
int precision = Integer.parseInt(accuracy[0]);
int scale = Integer.parseInt(accuracy[1]);
return createDecimalOrStringType(precision, scale);
}
if ("String".contains(ckType) || ckType.startsWith("Enum")
|| ckType.startsWith("IPv") || "UUID".contains(ckType)
|| ckType.startsWith("FixedString")) {
return ScalarType.createStringType();
}
if (ckType.startsWith("DateTime")) {
// DateTime with second precision, DateTime64 with [0~9] precision
if (ckType.equals("DateTime")) {
return ScalarType.createDatetimeV2Type(0);
} else {
// will lose precision
return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE);
}
}
if (ckType.startsWith("Array")) {
String cktype = ckType.substring(6, ckType.length() - 1);
fieldSchema.setDataTypeName(cktype);
Type type = jdbcTypeToDoris(fieldSchema);
return ArrayType.create(type, true);
}
switch (ckType) {
case "Bool":
return Type.BOOLEAN;
case "Int8":
return Type.TINYINT;
case "Int16":
case "UInt8":
return Type.SMALLINT;
case "Int32":
case "UInt16":
return Type.INT;
case "Int64":
case "UInt32":
return Type.BIGINT;
case "Int128":
case "UInt64":
return Type.LARGEINT;
case "Int256":
case "UInt128":
case "UInt256":
return ScalarType.createStringType();
case "Float32":
return Type.FLOAT;
case "Float64":
return Type.DOUBLE;
case "Date":
case "Date32":
return ScalarType.createDateV2Type();
default:
return Type.UNSUPPORTED;
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import java.util.Map;
public class JdbcClientConfig {
private String user;
private String password;
private String jdbcUrl;
private String driverUrl;
private String driverClass;
private String onlySpecifiedDatabase;
private String isLowerCaseTableNames;
private Map<String, Boolean> includeDatabaseMap;
private Map<String, Boolean> excludeDatabaseMap;
public String getUser() {
return user;
}
public JdbcClientConfig setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
public JdbcClientConfig setPassword(String password) {
this.password = password;
return this;
}
public String getJdbcUrl() {
return jdbcUrl;
}
public JdbcClientConfig setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}
public String getDriverUrl() {
return driverUrl;
}
public JdbcClientConfig setDriverUrl(String driverUrl) {
this.driverUrl = driverUrl;
return this;
}
public String getDriverClass() {
return driverClass;
}
public JdbcClientConfig setDriverClass(String driverClass) {
this.driverClass = driverClass;
return this;
}
public String getOnlySpecifiedDatabase() {
return onlySpecifiedDatabase;
}
public JdbcClientConfig setOnlySpecifiedDatabase(String onlySpecifiedDatabase) {
this.onlySpecifiedDatabase = onlySpecifiedDatabase;
return this;
}
public String getIsLowerCaseTableNames() {
return isLowerCaseTableNames;
}
public JdbcClientConfig setIsLowerCaseTableNames(String isLowerCaseTableNames) {
this.isLowerCaseTableNames = isLowerCaseTableNames;
return this;
}
public Map<String, Boolean> getIncludeDatabaseMap() {
return includeDatabaseMap;
}
public JdbcClientConfig setIncludeDatabaseMap(Map<String, Boolean> includeDatabaseMap) {
this.includeDatabaseMap = includeDatabaseMap;
return this;
}
public Map<String, Boolean> getExcludeDatabaseMap() {
return excludeDatabaseMap;
}
public JdbcClientConfig setExcludeDatabaseMap(Map<String, Boolean> excludeDatabaseMap) {
this.excludeDatabaseMap = excludeDatabaseMap;
return this;
}
}

View File

@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import avro.shaded.com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.function.Consumer;
public class JdbcMySQLClient extends JdbcClient {
protected JdbcMySQLClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SHOW DATABASES";
}
@Override
protected List<String> getSpecifiedDatabase(Connection conn) {
List<String> databaseNames = Lists.newArrayList();
try {
databaseNames.add(conn.getCatalog());
} catch (SQLException e) {
throw new JdbcClientException("failed to get specified database name from jdbc", e);
} finally {
close(conn);
}
return databaseNames;
}
@Override
protected void processTable(String dbName, String tableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
rs = databaseMetaData.getTables(dbName, null, tableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
} finally {
close(rs, conn);
}
}
@Override
protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName,
String tableName) throws SQLException {
return databaseMetaData.getColumns(schemaName, null, tableName, null);
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
// For mysql type: "INT UNSIGNED":
// fieldSchema.getDataTypeName().split(" ")[0] == "INT"
// fieldSchema.getDataTypeName().split(" ")[1] == "UNSIGNED"
String[] typeFields = fieldSchema.getDataTypeName().split(" ");
String mysqlType = typeFields[0];
// For unsigned int, should extend the type.
if (typeFields.length > 1 && "UNSIGNED".equals(typeFields[1])) {
switch (mysqlType) {
case "TINYINT":
return Type.SMALLINT;
case "SMALLINT":
case "MEDIUMINT":
return Type.INT;
case "INT":
return Type.BIGINT;
case "BIGINT":
return Type.LARGEINT;
case "DECIMAL":
int precision = fieldSchema.getColumnSize() + 1;
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
case "DOUBLE":
// As of MySQL 8.0.17, the UNSIGNED attribute is deprecated
// for columns of type FLOAT, DOUBLE, and DECIMAL (and any synonyms)
// https://dev.mysql.com/doc/refman/8.0/en/numeric-type-syntax.html
// The maximum value may cause errors due to insufficient accuracy
return Type.DOUBLE;
case "FLOAT":
return Type.FLOAT;
default:
throw new JdbcClientException("Unknown UNSIGNED type of mysql, type: [" + mysqlType + "]");
}
}
switch (mysqlType) {
case "BOOLEAN":
return Type.BOOLEAN;
case "TINYINT":
return Type.TINYINT;
case "SMALLINT":
case "YEAR":
return Type.SMALLINT;
case "MEDIUMINT":
case "INT":
return Type.INT;
case "BIGINT":
return Type.BIGINT;
case "LARGEINT": // for jdbc catalog connecting Doris database
return Type.LARGEINT;
case "DATE":
case "DATEV2":
return ScalarType.createDateV2Type();
case "TIMESTAMP":
case "DATETIME":
case "DATETIMEV2": // for jdbc catalog connecting Doris database
// mysql can support microsecond
// todo(gaoxin): Get real precision of DATETIMEV2
return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE);
case "FLOAT":
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL":
case "DECIMALV3": // for jdbc catalog connecting Doris database
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
case "CHAR":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "VARCHAR":
return ScalarType.createVarcharType(fieldSchema.columnSize);
case "TIME":
case "TINYTEXT":
case "TEXT":
case "MEDIUMTEXT":
case "LONGTEXT":
case "TINYBLOB":
case "BLOB":
case "MEDIUMBLOB":
case "LONGBLOB":
case "TINYSTRING":
case "STRING":
case "MEDIUMSTRING":
case "LONGSTRING":
case "JSON":
case "SET":
case "BIT":
case "BINARY":
case "VARBINARY":
case "ENUM":
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.Type;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class JdbcOceanBaseClient extends JdbcClient {
private JdbcClient currentClient;
public JdbcOceanBaseClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = super.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'");
if (rs.next()) {
String compatibilityMode = rs.getString(2);
if ("MYSQL".equalsIgnoreCase(compatibilityMode)) {
currentClient = new JdbcMySQLClient(jdbcClientConfig);
} else if ("ORACLE".equalsIgnoreCase(compatibilityMode)) {
currentClient = new JdbcOracleClient(jdbcClientConfig);
setOracleMode();
} else {
throw new JdbcClientException("Unsupported compatibility mode: " + compatibilityMode);
}
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to determine OceanBase compatibility mode", e);
} finally {
close(rs, stmt, conn);
}
}
@Override
protected String getDatabaseQuery() {
return currentClient.getDatabaseQuery();
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
return currentClient.jdbcTypeToDoris(fieldSchema);
}
public void setOracleMode() {
this.dbType = JdbcResource.OCEANBASE_ORACLE;
}
}

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcOracleClient extends JdbcClient {
protected JdbcOracleClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SELECT DISTINCT OWNER FROM all_tables";
}
@Override
protected String modifyTableNameIfNecessary(String tableName) {
return tableName.replace("/", "%");
}
@Override
protected boolean isTableModified(String modifiedTableName, String actualTableName) {
return !modifiedTableName.equals(actualTableName);
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String oracleType = fieldSchema.getDataTypeName();
if (oracleType.startsWith("INTERVAL")) {
oracleType = oracleType.substring(0, 8);
} else if (oracleType.startsWith("TIMESTAMP")) {
if (oracleType.equals("TIMESTAMPTZ") || oracleType.equals("TIMESTAMPLTZ")) {
return Type.UNSUPPORTED;
}
// oracle can support nanosecond, will lose precision
return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE);
}
switch (oracleType) {
/**
* The data type NUMBER(p,s) of oracle has some different of doris decimal type in semantics.
* For Oracle Number(p,s) type:
* 1. if s<0 , it means this is an Interger.
* This NUMBER(p,s) has (p+|s| ) significant digit, and rounding will be performed at s position.
* eg: if we insert 1234567 into NUMBER(5,-2) type, then the oracle will store 1234500.
* In this case, Doris will use INT type (TINYINT/SMALLINT/INT/.../LARGEINT).
* 2. if s>=0 && s<p , it just like doris Decimal(p,s) behavior.
* 3. if s>=0 && s>p, it means this is a decimal(like 0.xxxxx).
* p represents how many digits can be left to the left after the decimal point,
* the figure after the decimal point s will be rounded.
* eg: we can not insert 0.0123456 into NUMBER(5,7) type,
* because there must be two zeros on the right side of the decimal point,
* we can insert 0.0012345 into NUMBER(5,7) type.
* In this case, Doris will use DECIMAL(s,s)
* 4. if we don't specify p and s for NUMBER(p,s), just NUMBER, the p and s of NUMBER are uncertain.
* In this case, doris can not determine p and s, so doris can not determine data type.
*/
case "NUMBER":
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
if (scale <= 0) {
precision -= scale;
if (precision < 3) {
return Type.TINYINT;
} else if (precision < 5) {
return Type.SMALLINT;
} else if (precision < 10) {
return Type.INT;
} else if (precision < 19) {
return Type.BIGINT;
} else if (precision < 39) {
// LARGEINT supports up to 38 numbers.
return Type.LARGEINT;
} else {
return ScalarType.createStringType();
}
}
// scale > 0
if (precision < scale) {
precision = scale;
}
return createDecimalOrStringType(precision, scale);
case "FLOAT":
return Type.DOUBLE;
case "DATE":
// can save date and time with second precision
return ScalarType.createDatetimeV2Type(0);
case "VARCHAR2":
case "NVARCHAR2":
case "CHAR":
case "NCHAR":
case "LONG":
case "RAW":
case "LONG RAW":
case "INTERVAL":
return ScalarType.createStringType();
case "BLOB":
case "CLOB":
case "NCLOB":
case "BFILE":
case "BINARY_FLOAT":
case "BINARY_DOUBLE":
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcPostgreSQLClient extends JdbcClient {
protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SELECT nspname FROM pg_namespace WHERE has_schema_privilege("
+ "'" + jdbcUser + "', nspname, 'USAGE');";
}
@Override
protected String[] getTableTypes() {
return new String[] {"TABLE", "VIEW", "MATERIALIZED VIEW", "FOREIGN TABLE"};
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String pgType = fieldSchema.getDataTypeName();
switch (pgType) {
case "int2":
case "smallserial":
return Type.SMALLINT;
case "int4":
case "serial":
return Type.INT;
case "int8":
case "bigserial":
return Type.BIGINT;
case "numeric": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
}
case "float4":
return Type.FLOAT;
case "float8":
return Type.DOUBLE;
case "bpchar":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "timestamp":
case "timestamptz":
// postgres can support microsecond
return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE);
case "date":
return ScalarType.createDateV2Type();
case "bool":
return Type.BOOLEAN;
case "bit":
if (fieldSchema.getColumnSize() == 1) {
return Type.BOOLEAN;
} else {
return ScalarType.createStringType();
}
case "point":
case "line":
case "lseg":
case "box":
case "path":
case "polygon":
case "circle":
case "varchar":
case "text":
case "time":
case "timetz":
case "interval":
case "cidr":
case "inet":
case "macaddr":
case "varbit":
case "jsonb":
case "uuid":
case "bytea":
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcSQLServerClient extends JdbcClient {
protected JdbcSQLServerClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SELECT name FROM sys.schemas";
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String originSqlserverType = fieldSchema.getDataTypeName();
// For sqlserver IDENTITY type, such as 'INT IDENTITY'
// originSqlserverType is "int identity", so we only get "int".
String sqlserverType = originSqlserverType.split(" ")[0];
switch (sqlserverType) {
case "bit":
return Type.BOOLEAN;
case "tinyint":
case "smallint":
return Type.SMALLINT;
case "int":
return Type.INT;
case "bigint":
return Type.BIGINT;
case "real":
return Type.FLOAT;
case "float":
return Type.DOUBLE;
case "money":
return ScalarType.createDecimalV3Type(19, 4);
case "smallmoney":
return ScalarType.createDecimalV3Type(10, 4);
case "decimal":
case "numeric":
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return ScalarType.createDecimalV3Type(precision, scale);
case "date":
return ScalarType.createDateV2Type();
case "datetime":
// datetime with millisecond precision
return ScalarType.createDatetimeV2Type(3);
case "datetime2":
// datetime2 with 100 nanoseconds precision, will lose precision
return ScalarType.createDatetimeV2Type(6);
case "smalldatetime":
// smalldatetime with second precision
return ScalarType.createDatetimeV2Type(0);
case "char":
case "varchar":
case "nchar":
case "nvarchar":
case "text":
case "ntext":
case "time":
case "datetimeoffset":
return ScalarType.createStringType();
case "image":
case "binary":
case "varbinary":
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcSapHanaClient extends JdbcClient {
protected JdbcSapHanaClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SELECT SCHEMA_NAME FROM SYS.SCHEMAS WHERE HAS_PRIVILEGES = 'TRUE'";
}
@Override
protected String[] getTableTypes() {
return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"};
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String hanaType = fieldSchema.getDataTypeName();
switch (hanaType) {
case "TINYINT":
return Type.TINYINT;
case "SMALLINT":
return Type.SMALLINT;
case "INTEGER":
return Type.INT;
case "BIGINT":
return Type.BIGINT;
case "SMALLDECIMAL":
case "DECIMAL": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
}
case "REAL":
return Type.FLOAT;
case "DOUBLE":
return Type.DOUBLE;
case "TIMESTAMP":
// TIMESTAMP with 100 nanoseconds precision, will lose precision
return ScalarType.createDatetimeV2Type(6);
case "SECONDDATE":
// SECONDDATE with second precision
return ScalarType.createDatetimeV2Type(0);
case "DATE":
return ScalarType.createDateV2Type();
case "BOOLEAN":
return Type.BOOLEAN;
case "CHAR":
case "NCHAR":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "TIME":
case "VARCHAR":
case "NVARCHAR":
case "ALPHANUM":
case "SHORTTEXT":
return ScalarType.createStringType();
case "BINARY":
case "VARBINARY":
case "BLOB":
case "CLOB":
case "NCLOB":
case "TEXT":
case "BINTEXT":
case "ST_GEOMETRY":
case "ST_POINT":
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.jdbc;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import java.sql.Connection;
import java.sql.SQLException;
public class JdbcTrinoClient extends JdbcClient {
protected JdbcTrinoClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SHOW SCHEMAS";
}
@Override
protected String getCatalogName(Connection conn) throws SQLException {
return conn.getCatalog();
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String trinoType = fieldSchema.getDataTypeName();
switch (trinoType) {
case "integer":
return Type.INT;
case "bigint":
return Type.BIGINT;
case "smallint":
return Type.SMALLINT;
case "tinyint":
return Type.TINYINT;
case "double":
return Type.DOUBLE;
case "real":
return Type.FLOAT;
case "boolean":
return Type.BOOLEAN;
case "date":
return ScalarType.createDateV2Type();
default:
break;
}
if (trinoType.startsWith("decimal")) {
String[] split = trinoType.split("\\(");
String[] precisionAndScale = split[1].split(",");
int precision = Integer.parseInt(precisionAndScale[0]);
int scale = Integer.parseInt(precisionAndScale[1].substring(0, precisionAndScale[1].length() - 1));
return createDecimalOrStringType(precision, scale);
}
if (trinoType.startsWith("char")) {
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
}
if (trinoType.startsWith("timestamp")) {
// timestamp with picoseconds precision, will lose precision
return ScalarType.createDatetimeV2Type(JDBC_DATETIME_SCALE);
}
if (trinoType.startsWith("array")) {
String trinoArrType = trinoType.substring(6, trinoType.length() - 1);
fieldSchema.setDataTypeName(trinoArrType);
Type type = jdbcTypeToDoris(fieldSchema);
return ArrayType.create(type, true);
}
if (trinoType.startsWith("varchar")) {
return ScalarType.createStringType();
}
return Type.UNSUPPORTED;
}
}

View File

@ -185,11 +185,7 @@ true abcHa1.12345 1.123450xkalowadawd 2022-10-01 3.14159 1 2 0 100000 1.2345678
doris_test
-- !specified_database_2 --
information_schema
init_db
mysql
performance_schema
sys
doris_test
-- !specified_database_3 --
information_schema
@ -199,11 +195,6 @@ performance_schema
sys
-- !specified_database_4 --
information_schema
init_db
mysql
performance_schema
sys
-- !ex_tb1 --
{"k1":"v1", "k2":"v2"}

View File

@ -2161,11 +2161,7 @@ doris3 20
doris_test
-- !specified_database_2 --
catalog_pg_test
information_schema
pg_catalog
pg_toast
public
doris_test
-- !specified_database_3 --
catalog_pg_test
@ -2175,11 +2171,6 @@ pg_toast
public
-- !specified_database_4 --
catalog_pg_test
information_schema
pg_catalog
pg_toast
public
-- !test_old --
123 abc